488 lines
15 KiB
Go
488 lines
15 KiB
Go
package handlers
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"strconv" // Added strconv for JobID parsing
|
|
|
|
"git.ourworld.tf/herocode/heroagent/pkg/herojobs"
|
|
"github.com/gofiber/fiber/v2"
|
|
)
|
|
|
|
// RedisClientInterface defines the methods JobHandler needs from a HeroJobs Redis client.
|
|
type RedisClientInterface interface {
|
|
StoreJob(job *herojobs.Job) error
|
|
EnqueueJob(job *herojobs.Job) error
|
|
GetJob(jobID interface{}) (*herojobs.Job, error) // Changed jobID type to interface{}
|
|
ListJobs(circleID, topic string) ([]uint32, error)
|
|
QueueSize(circleID, topic string) (int64, error)
|
|
QueueEmpty(circleID, topic string) error
|
|
// herojobs.Job also has Load() and Save() methods, but those are on the Job object itself,
|
|
// not typically part of the client interface unless the client is a facade for all job operations.
|
|
}
|
|
|
|
// JobHandler handles job-related routes
|
|
type JobHandler struct {
|
|
client RedisClientInterface // Changed to use the interface
|
|
logger *log.Logger
|
|
}
|
|
|
|
// NewJobHandler creates a new JobHandler
|
|
func NewJobHandler(redisAddr string, logger *log.Logger) (*JobHandler, error) {
|
|
redisClient, err := herojobs.NewRedisClient(redisAddr, false)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create HeroJobs Redis client: %w", err)
|
|
}
|
|
// *herojobs.RedisClient must implement RedisClientInterface.
|
|
// This assignment is valid if *herojobs.RedisClient has all methods of RedisClientInterface.
|
|
return &JobHandler{
|
|
client: redisClient,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
// RegisterRoutes registers job API routes
|
|
func (h *JobHandler) RegisterRoutes(app *fiber.App) {
|
|
// Register common routes to both API and admin groups
|
|
jobRoutes := func(group fiber.Router) {
|
|
group.Post("/submit", h.submitJob)
|
|
group.Get("/get/:id", h.getJob)
|
|
group.Delete("/delete/:id", h.deleteJob)
|
|
group.Get("/list", h.listJobs)
|
|
group.Get("/queue/size", h.queueSize)
|
|
group.Post("/queue/empty", h.queueEmpty)
|
|
group.Get("/queue/get", h.queueGet)
|
|
group.Post("/create", h.createJob)
|
|
}
|
|
|
|
// Apply common routes to API group
|
|
apiJobs := app.Group("/api/jobs")
|
|
jobRoutes(apiJobs)
|
|
|
|
// Apply common routes to admin group
|
|
adminJobs := app.Group("/admin/jobs")
|
|
jobRoutes(adminJobs)
|
|
}
|
|
|
|
// @Summary Submit a job
|
|
// @Description Submit a new job to the HeroJobs server
|
|
// @Tags jobs
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param job body herojobs.Job true "Job to submit"
|
|
// @Success 200 {object} herojobs.Job
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/submit [post]
|
|
// @Router /admin/jobs/submit [post]
|
|
func (h *JobHandler) submitJob(c *fiber.Ctx) error {
|
|
// Parse job from request body
|
|
var job herojobs.Job
|
|
if err := c.BodyParser(&job); err != nil {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to parse job data: %v", err),
|
|
})
|
|
}
|
|
|
|
// Save job to OurDB (this assigns/confirms JobID)
|
|
if err := job.Save(); err != nil {
|
|
h.logger.Printf("Failed to save job to OurDB: %v", err)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to save job: %v", err),
|
|
})
|
|
}
|
|
|
|
// Store job in Redis
|
|
if err := h.client.StoreJob(&job); err != nil {
|
|
h.logger.Printf("Failed to store job in Redis: %v", err)
|
|
// Attempt to roll back or log, but proceed to enqueue if critical
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to store job in Redis: %v", err),
|
|
})
|
|
}
|
|
|
|
// Enqueue job in Redis
|
|
if err := h.client.EnqueueJob(&job); err != nil {
|
|
h.logger.Printf("Failed to enqueue job in Redis: %v", err)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to enqueue job: %v", err),
|
|
})
|
|
}
|
|
|
|
return c.JSON(job)
|
|
}
|
|
|
|
// @Summary Get a job
|
|
// @Description Get a job by ID
|
|
// @Tags jobs
|
|
// @Produce json
|
|
// @Param id path string true "Job ID"
|
|
// @Success 200 {object} herojobs.Job
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/get/{id} [get]
|
|
// @Router /admin/jobs/get/{id} [get]
|
|
func (h *JobHandler) getJob(c *fiber.Ctx) error {
|
|
// Get job ID from path parameter
|
|
jobIDStr := c.Params("id")
|
|
if jobIDStr == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Job ID is required",
|
|
})
|
|
}
|
|
|
|
// Convert jobID string to uint32
|
|
jobID64, err := strconv.ParseUint(jobIDStr, 10, 32)
|
|
if err != nil {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Invalid Job ID format: %s. %v", jobIDStr, err),
|
|
})
|
|
}
|
|
jobID := uint32(jobID64)
|
|
|
|
// Get job from Redis first
|
|
job, err := h.client.GetJob(jobID)
|
|
if err != nil {
|
|
// If not found in Redis (e.g. redis.Nil or other error), try OurDB
|
|
h.logger.Printf("Job %d not found in Redis or error: %v. Trying OurDB.", jobID, err)
|
|
retrievedJob := &herojobs.Job{JobID: jobID}
|
|
if loadErr := retrievedJob.Load(); loadErr != nil {
|
|
h.logger.Printf("Failed to load job %d from OurDB: %v", jobID, loadErr)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to get job %d: %v / %v", jobID, err, loadErr),
|
|
})
|
|
}
|
|
job = retrievedJob // Use the job loaded from OurDB
|
|
}
|
|
|
|
return c.JSON(job)
|
|
}
|
|
|
|
// @Summary Delete a job
|
|
// @Description Delete a job by ID
|
|
// @Tags jobs
|
|
// @Produce json
|
|
// @Param id path string true "Job ID"
|
|
// @Success 200 {object} map[string]string
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/delete/{id} [delete]
|
|
// @Router /admin/jobs/delete/{id} [delete]
|
|
func (h *JobHandler) deleteJob(c *fiber.Ctx) error {
|
|
// Get job ID from path parameter
|
|
jobIDStr := c.Params("id")
|
|
if jobIDStr == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Job ID is required",
|
|
})
|
|
}
|
|
|
|
// Deleting jobs requires removing from OurDB and Redis.
|
|
// This functionality is not directly provided by RedisClient.DeleteJob
|
|
// and OurDB job deletion is not specified in README.
|
|
// For now, returning not implemented.
|
|
h.logger.Printf("Attempt to delete job %s - not implemented", jobIDStr)
|
|
return c.Status(fiber.StatusNotImplemented).JSON(fiber.Map{
|
|
"error": "Job deletion is not implemented",
|
|
"message": fmt.Sprintf("Job %s deletion requested but not implemented.", jobIDStr),
|
|
})
|
|
}
|
|
|
|
// @Summary List jobs
|
|
// @Description List jobs by circle ID and topic
|
|
// @Tags jobs
|
|
// @Produce json
|
|
// @Param circleid query string true "Circle ID"
|
|
// @Param topic query string true "Topic"
|
|
// @Success 200 {object} map[string][]string
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/list [get]
|
|
// @Router /admin/jobs/list [get]
|
|
func (h *JobHandler) listJobs(c *fiber.Ctx) error {
|
|
// Get parameters from query
|
|
circleID := c.Query("circleid")
|
|
if circleID == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Circle ID is required",
|
|
})
|
|
}
|
|
|
|
topic := c.Query("topic")
|
|
if topic == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Topic is required",
|
|
})
|
|
}
|
|
|
|
// List jobs
|
|
jobs, err := h.client.ListJobs(circleID, topic)
|
|
if err != nil {
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to list jobs: %v", err),
|
|
})
|
|
}
|
|
|
|
return c.JSON(fiber.Map{
|
|
"status": "success",
|
|
"jobs": jobs,
|
|
})
|
|
}
|
|
|
|
// @Summary Get queue size
|
|
// @Description Get the size of a job queue by circle ID and topic
|
|
// @Tags jobs
|
|
// @Produce json
|
|
// @Param circleid query string true "Circle ID"
|
|
// @Param topic query string true "Topic"
|
|
// @Success 200 {object} map[string]int64
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/queue/size [get]
|
|
// @Router /admin/jobs/queue/size [get]
|
|
func (h *JobHandler) queueSize(c *fiber.Ctx) error {
|
|
// Get parameters from query
|
|
circleID := c.Query("circleid")
|
|
if circleID == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Circle ID is required",
|
|
})
|
|
}
|
|
|
|
topic := c.Query("topic")
|
|
if topic == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Topic is required",
|
|
})
|
|
}
|
|
|
|
// Get queue size
|
|
size, err := h.client.QueueSize(circleID, topic)
|
|
if err != nil {
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to get queue size: %v", err),
|
|
})
|
|
}
|
|
|
|
return c.JSON(fiber.Map{
|
|
"status": "success",
|
|
"size": size,
|
|
})
|
|
}
|
|
|
|
// @Summary Empty queue
|
|
// @Description Empty a job queue by circle ID and topic
|
|
// @Tags jobs
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param body body object true "Queue parameters"
|
|
// @Success 200 {object} map[string]string
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/queue/empty [post]
|
|
// @Router /admin/jobs/queue/empty [post]
|
|
func (h *JobHandler) queueEmpty(c *fiber.Ctx) error {
|
|
// Parse parameters from request body
|
|
var params struct {
|
|
CircleID string `json:"circleid"`
|
|
Topic string `json:"topic"`
|
|
}
|
|
if err := c.BodyParser(¶ms); err != nil {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to parse parameters: %v", err),
|
|
})
|
|
}
|
|
|
|
if params.CircleID == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Circle ID is required",
|
|
})
|
|
}
|
|
|
|
if params.Topic == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Topic is required",
|
|
})
|
|
}
|
|
|
|
// Empty queue
|
|
if err := h.client.QueueEmpty(params.CircleID, params.Topic); err != nil {
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to empty queue: %v", err),
|
|
})
|
|
}
|
|
|
|
return c.JSON(fiber.Map{
|
|
"status": "success",
|
|
"message": fmt.Sprintf("Queue for circle %s and topic %s emptied successfully", params.CircleID, params.Topic),
|
|
})
|
|
}
|
|
|
|
// @Summary Get job from queue
|
|
// @Description Get a job from a queue without removing it
|
|
// @Tags jobs
|
|
// @Produce json
|
|
// @Param circleid query string true "Circle ID"
|
|
// @Param topic query string true "Topic"
|
|
// @Success 200 {object} herojobs.Job
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/queue/get [get]
|
|
// @Router /admin/jobs/queue/get [get]
|
|
func (h *JobHandler) queueGet(c *fiber.Ctx) error {
|
|
// Get parameters from query
|
|
circleID := c.Query("circleid")
|
|
if circleID == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Circle ID is required",
|
|
})
|
|
}
|
|
|
|
topic := c.Query("topic")
|
|
if topic == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Topic is required",
|
|
})
|
|
}
|
|
|
|
// Get list of job IDs (uint32) from the queue (non-destructive)
|
|
jobIDs, err := h.client.ListJobs(circleID, topic)
|
|
if err != nil {
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to list jobs in queue: %v", err),
|
|
})
|
|
}
|
|
|
|
if len(jobIDs) == 0 {
|
|
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
|
|
"error": "Queue is empty or no jobs found",
|
|
})
|
|
}
|
|
|
|
// Take the first job ID from the list (it's already uint32)
|
|
jobIDToFetch := jobIDs[0]
|
|
|
|
// Get the actual job details using the ID
|
|
job, err := h.client.GetJob(jobIDToFetch)
|
|
if err != nil {
|
|
// If not found in Redis (e.g. redis.Nil or other error), try OurDB
|
|
h.logger.Printf("Job %d (from queue list) not found in Redis or error: %v. Trying OurDB.", jobIDToFetch, err)
|
|
retrievedJob := &herojobs.Job{JobID: jobIDToFetch} // Ensure CircleID and Topic are set if Load needs them
|
|
retrievedJob.CircleID = circleID // Needed for Load if path depends on it
|
|
retrievedJob.Topic = topic // Needed for Load if path depends on it
|
|
if loadErr := retrievedJob.Load(); loadErr != nil {
|
|
h.logger.Printf("Failed to load job %d from OurDB: %v", jobIDToFetch, loadErr)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to get job %d from queue (Redis err: %v / OurDB err: %v)", jobIDToFetch, err, loadErr),
|
|
})
|
|
}
|
|
job = retrievedJob // Use the job loaded from OurDB
|
|
}
|
|
|
|
return c.JSON(job)
|
|
}
|
|
|
|
// @Summary Create job
|
|
// @Description Create a new job with the given parameters
|
|
// @Tags jobs
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param body body object true "Job parameters"
|
|
// @Success 200 {object} herojobs.Job
|
|
// @Failure 400 {object} map[string]string
|
|
// @Failure 500 {object} map[string]string
|
|
// @Router /api/jobs/create [post]
|
|
// @Router /admin/jobs/create [post]
|
|
func (h *JobHandler) createJob(c *fiber.Ctx) error {
|
|
// Parse parameters from request body
|
|
var reqBody struct {
|
|
CircleID string `json:"circleid"`
|
|
Topic string `json:"topic"`
|
|
SessionKey string `json:"sessionkey"`
|
|
Params string `json:"params"`
|
|
ParamsType string `json:"paramstype"`
|
|
Timeout int64 `json:"timeout"` // Optional: allow timeout override
|
|
Log bool `json:"log"` // Optional: allow log enabling
|
|
}
|
|
if err := c.BodyParser(&reqBody); err != nil {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to parse parameters: %v", err),
|
|
})
|
|
}
|
|
|
|
if reqBody.CircleID == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Circle ID is required",
|
|
})
|
|
}
|
|
if reqBody.Topic == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Topic is required",
|
|
})
|
|
}
|
|
if reqBody.Params == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "Params are required",
|
|
})
|
|
}
|
|
if reqBody.ParamsType == "" {
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": "ParamsType is required",
|
|
})
|
|
}
|
|
|
|
// Create a new job instance
|
|
job := herojobs.NewJob() // Initializes with defaults
|
|
job.CircleID = reqBody.CircleID
|
|
job.Topic = reqBody.Topic
|
|
job.SessionKey = reqBody.SessionKey
|
|
job.Params = reqBody.Params
|
|
|
|
// Convert ParamsType string to herojobs.ParamsType
|
|
switch herojobs.ParamsType(reqBody.ParamsType) {
|
|
case herojobs.ParamsTypeHeroScript:
|
|
job.ParamsType = herojobs.ParamsTypeHeroScript
|
|
case herojobs.ParamsTypeRhaiScript:
|
|
job.ParamsType = herojobs.ParamsTypeRhaiScript
|
|
case herojobs.ParamsTypeOpenRPC:
|
|
job.ParamsType = herojobs.ParamsTypeOpenRPC
|
|
case herojobs.ParamsTypeAI:
|
|
job.ParamsType = herojobs.ParamsTypeAI
|
|
default:
|
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Invalid ParamsType: %s", reqBody.ParamsType),
|
|
})
|
|
}
|
|
|
|
if reqBody.Timeout > 0 {
|
|
job.Timeout = reqBody.Timeout
|
|
}
|
|
job.Log = reqBody.Log
|
|
|
|
// Save job to OurDB (this assigns JobID)
|
|
if err := job.Save(); err != nil {
|
|
h.logger.Printf("Failed to save new job to OurDB: %v", err)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to save new job: %v", err),
|
|
})
|
|
}
|
|
|
|
// Store job in Redis
|
|
if err := h.client.StoreJob(job); err != nil {
|
|
h.logger.Printf("Failed to store new job in Redis: %v", err)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to store new job in Redis: %v", err),
|
|
})
|
|
}
|
|
|
|
// Enqueue job in Redis
|
|
if err := h.client.EnqueueJob(job); err != nil {
|
|
h.logger.Printf("Failed to enqueue new job in Redis: %v", err)
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to enqueue new job: %v", err),
|
|
})
|
|
}
|
|
|
|
return c.JSON(job)
|
|
}
|