This repository has been archived on 2025-08-04. You can view files and clone it, but cannot push or open issues or pull requests.
heroagent_go_old/pkg/heroagent/handlers/job_handlers.go
2025-05-23 13:44:18 +04:00

488 lines
15 KiB
Go

package handlers
import (
"fmt"
"log"
"strconv" // Added strconv for JobID parsing
"github.com/freeflowuniverse/heroagent/pkg/herojobs"
"github.com/gofiber/fiber/v2"
)
// RedisClientInterface defines the methods JobHandler needs from a HeroJobs Redis client.
type RedisClientInterface interface {
StoreJob(job *herojobs.Job) error
EnqueueJob(job *herojobs.Job) error
GetJob(jobID interface{}) (*herojobs.Job, error) // Changed jobID type to interface{}
ListJobs(circleID, topic string) ([]uint32, error)
QueueSize(circleID, topic string) (int64, error)
QueueEmpty(circleID, topic string) error
// herojobs.Job also has Load() and Save() methods, but those are on the Job object itself,
// not typically part of the client interface unless the client is a facade for all job operations.
}
// JobHandler handles job-related routes
type JobHandler struct {
client RedisClientInterface // Changed to use the interface
logger *log.Logger
}
// NewJobHandler creates a new JobHandler
func NewJobHandler(redisAddr string, logger *log.Logger) (*JobHandler, error) {
redisClient, err := herojobs.NewRedisClient(redisAddr, false)
if err != nil {
return nil, fmt.Errorf("failed to create HeroJobs Redis client: %w", err)
}
// *herojobs.RedisClient must implement RedisClientInterface.
// This assignment is valid if *herojobs.RedisClient has all methods of RedisClientInterface.
return &JobHandler{
client: redisClient,
logger: logger,
}, nil
}
// RegisterRoutes registers job API routes
func (h *JobHandler) RegisterRoutes(app *fiber.App) {
// Register common routes to both API and admin groups
jobRoutes := func(group fiber.Router) {
group.Post("/submit", h.submitJob)
group.Get("/get/:id", h.getJob)
group.Delete("/delete/:id", h.deleteJob)
group.Get("/list", h.listJobs)
group.Get("/queue/size", h.queueSize)
group.Post("/queue/empty", h.queueEmpty)
group.Get("/queue/get", h.queueGet)
group.Post("/create", h.createJob)
}
// Apply common routes to API group
apiJobs := app.Group("/api/jobs")
jobRoutes(apiJobs)
// Apply common routes to admin group
adminJobs := app.Group("/admin/jobs")
jobRoutes(adminJobs)
}
// @Summary Submit a job
// @Description Submit a new job to the HeroJobs server
// @Tags jobs
// @Accept json
// @Produce json
// @Param job body herojobs.Job true "Job to submit"
// @Success 200 {object} herojobs.Job
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/submit [post]
// @Router /admin/jobs/submit [post]
func (h *JobHandler) submitJob(c *fiber.Ctx) error {
// Parse job from request body
var job herojobs.Job
if err := c.BodyParser(&job); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to parse job data: %v", err),
})
}
// Save job to OurDB (this assigns/confirms JobID)
if err := job.Save(); err != nil {
h.logger.Printf("Failed to save job to OurDB: %v", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to save job: %v", err),
})
}
// Store job in Redis
if err := h.client.StoreJob(&job); err != nil {
h.logger.Printf("Failed to store job in Redis: %v", err)
// Attempt to roll back or log, but proceed to enqueue if critical
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to store job in Redis: %v", err),
})
}
// Enqueue job in Redis
if err := h.client.EnqueueJob(&job); err != nil {
h.logger.Printf("Failed to enqueue job in Redis: %v", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to enqueue job: %v", err),
})
}
return c.JSON(job)
}
// @Summary Get a job
// @Description Get a job by ID
// @Tags jobs
// @Produce json
// @Param id path string true "Job ID"
// @Success 200 {object} herojobs.Job
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/get/{id} [get]
// @Router /admin/jobs/get/{id} [get]
func (h *JobHandler) getJob(c *fiber.Ctx) error {
// Get job ID from path parameter
jobIDStr := c.Params("id")
if jobIDStr == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Job ID is required",
})
}
// Convert jobID string to uint32
jobID64, err := strconv.ParseUint(jobIDStr, 10, 32)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": fmt.Sprintf("Invalid Job ID format: %s. %v", jobIDStr, err),
})
}
jobID := uint32(jobID64)
// Get job from Redis first
job, err := h.client.GetJob(jobID)
if err != nil {
// If not found in Redis (e.g. redis.Nil or other error), try OurDB
h.logger.Printf("Job %d not found in Redis or error: %v. Trying OurDB.", jobID, err)
retrievedJob := &herojobs.Job{JobID: jobID}
if loadErr := retrievedJob.Load(); loadErr != nil {
h.logger.Printf("Failed to load job %d from OurDB: %v", jobID, loadErr)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to get job %d: %v / %v", jobID, err, loadErr),
})
}
job = retrievedJob // Use the job loaded from OurDB
}
return c.JSON(job)
}
// @Summary Delete a job
// @Description Delete a job by ID
// @Tags jobs
// @Produce json
// @Param id path string true "Job ID"
// @Success 200 {object} map[string]string
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/delete/{id} [delete]
// @Router /admin/jobs/delete/{id} [delete]
func (h *JobHandler) deleteJob(c *fiber.Ctx) error {
// Get job ID from path parameter
jobIDStr := c.Params("id")
if jobIDStr == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Job ID is required",
})
}
// Deleting jobs requires removing from OurDB and Redis.
// This functionality is not directly provided by RedisClient.DeleteJob
// and OurDB job deletion is not specified in README.
// For now, returning not implemented.
h.logger.Printf("Attempt to delete job %s - not implemented", jobIDStr)
return c.Status(fiber.StatusNotImplemented).JSON(fiber.Map{
"error": "Job deletion is not implemented",
"message": fmt.Sprintf("Job %s deletion requested but not implemented.", jobIDStr),
})
}
// @Summary List jobs
// @Description List jobs by circle ID and topic
// @Tags jobs
// @Produce json
// @Param circleid query string true "Circle ID"
// @Param topic query string true "Topic"
// @Success 200 {object} map[string][]string
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/list [get]
// @Router /admin/jobs/list [get]
func (h *JobHandler) listJobs(c *fiber.Ctx) error {
// Get parameters from query
circleID := c.Query("circleid")
if circleID == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Circle ID is required",
})
}
topic := c.Query("topic")
if topic == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Topic is required",
})
}
// List jobs
jobs, err := h.client.ListJobs(circleID, topic)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to list jobs: %v", err),
})
}
return c.JSON(fiber.Map{
"status": "success",
"jobs": jobs,
})
}
// @Summary Get queue size
// @Description Get the size of a job queue by circle ID and topic
// @Tags jobs
// @Produce json
// @Param circleid query string true "Circle ID"
// @Param topic query string true "Topic"
// @Success 200 {object} map[string]int64
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/queue/size [get]
// @Router /admin/jobs/queue/size [get]
func (h *JobHandler) queueSize(c *fiber.Ctx) error {
// Get parameters from query
circleID := c.Query("circleid")
if circleID == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Circle ID is required",
})
}
topic := c.Query("topic")
if topic == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Topic is required",
})
}
// Get queue size
size, err := h.client.QueueSize(circleID, topic)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to get queue size: %v", err),
})
}
return c.JSON(fiber.Map{
"status": "success",
"size": size,
})
}
// @Summary Empty queue
// @Description Empty a job queue by circle ID and topic
// @Tags jobs
// @Accept json
// @Produce json
// @Param body body object true "Queue parameters"
// @Success 200 {object} map[string]string
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/queue/empty [post]
// @Router /admin/jobs/queue/empty [post]
func (h *JobHandler) queueEmpty(c *fiber.Ctx) error {
// Parse parameters from request body
var params struct {
CircleID string `json:"circleid"`
Topic string `json:"topic"`
}
if err := c.BodyParser(&params); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to parse parameters: %v", err),
})
}
if params.CircleID == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Circle ID is required",
})
}
if params.Topic == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Topic is required",
})
}
// Empty queue
if err := h.client.QueueEmpty(params.CircleID, params.Topic); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to empty queue: %v", err),
})
}
return c.JSON(fiber.Map{
"status": "success",
"message": fmt.Sprintf("Queue for circle %s and topic %s emptied successfully", params.CircleID, params.Topic),
})
}
// @Summary Get job from queue
// @Description Get a job from a queue without removing it
// @Tags jobs
// @Produce json
// @Param circleid query string true "Circle ID"
// @Param topic query string true "Topic"
// @Success 200 {object} herojobs.Job
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/queue/get [get]
// @Router /admin/jobs/queue/get [get]
func (h *JobHandler) queueGet(c *fiber.Ctx) error {
// Get parameters from query
circleID := c.Query("circleid")
if circleID == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Circle ID is required",
})
}
topic := c.Query("topic")
if topic == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Topic is required",
})
}
// Get list of job IDs (uint32) from the queue (non-destructive)
jobIDs, err := h.client.ListJobs(circleID, topic)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to list jobs in queue: %v", err),
})
}
if len(jobIDs) == 0 {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "Queue is empty or no jobs found",
})
}
// Take the first job ID from the list (it's already uint32)
jobIDToFetch := jobIDs[0]
// Get the actual job details using the ID
job, err := h.client.GetJob(jobIDToFetch)
if err != nil {
// If not found in Redis (e.g. redis.Nil or other error), try OurDB
h.logger.Printf("Job %d (from queue list) not found in Redis or error: %v. Trying OurDB.", jobIDToFetch, err)
retrievedJob := &herojobs.Job{JobID: jobIDToFetch} // Ensure CircleID and Topic are set if Load needs them
retrievedJob.CircleID = circleID // Needed for Load if path depends on it
retrievedJob.Topic = topic // Needed for Load if path depends on it
if loadErr := retrievedJob.Load(); loadErr != nil {
h.logger.Printf("Failed to load job %d from OurDB: %v", jobIDToFetch, loadErr)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to get job %d from queue (Redis err: %v / OurDB err: %v)", jobIDToFetch, err, loadErr),
})
}
job = retrievedJob // Use the job loaded from OurDB
}
return c.JSON(job)
}
// @Summary Create job
// @Description Create a new job with the given parameters
// @Tags jobs
// @Accept json
// @Produce json
// @Param body body object true "Job parameters"
// @Success 200 {object} herojobs.Job
// @Failure 400 {object} map[string]string
// @Failure 500 {object} map[string]string
// @Router /api/jobs/create [post]
// @Router /admin/jobs/create [post]
func (h *JobHandler) createJob(c *fiber.Ctx) error {
// Parse parameters from request body
var reqBody struct {
CircleID string `json:"circleid"`
Topic string `json:"topic"`
SessionKey string `json:"sessionkey"`
Params string `json:"params"`
ParamsType string `json:"paramstype"`
Timeout int64 `json:"timeout"` // Optional: allow timeout override
Log bool `json:"log"` // Optional: allow log enabling
}
if err := c.BodyParser(&reqBody); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to parse parameters: %v", err),
})
}
if reqBody.CircleID == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Circle ID is required",
})
}
if reqBody.Topic == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Topic is required",
})
}
if reqBody.Params == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Params are required",
})
}
if reqBody.ParamsType == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "ParamsType is required",
})
}
// Create a new job instance
job := herojobs.NewJob() // Initializes with defaults
job.CircleID = reqBody.CircleID
job.Topic = reqBody.Topic
job.SessionKey = reqBody.SessionKey
job.Params = reqBody.Params
// Convert ParamsType string to herojobs.ParamsType
switch herojobs.ParamsType(reqBody.ParamsType) {
case herojobs.ParamsTypeHeroScript:
job.ParamsType = herojobs.ParamsTypeHeroScript
case herojobs.ParamsTypeRhaiScript:
job.ParamsType = herojobs.ParamsTypeRhaiScript
case herojobs.ParamsTypeOpenRPC:
job.ParamsType = herojobs.ParamsTypeOpenRPC
case herojobs.ParamsTypeAI:
job.ParamsType = herojobs.ParamsTypeAI
default:
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": fmt.Sprintf("Invalid ParamsType: %s", reqBody.ParamsType),
})
}
if reqBody.Timeout > 0 {
job.Timeout = reqBody.Timeout
}
job.Log = reqBody.Log
// Save job to OurDB (this assigns JobID)
if err := job.Save(); err != nil {
h.logger.Printf("Failed to save new job to OurDB: %v", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to save new job: %v", err),
})
}
// Store job in Redis
if err := h.client.StoreJob(job); err != nil {
h.logger.Printf("Failed to store new job in Redis: %v", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to store new job in Redis: %v", err),
})
}
// Enqueue job in Redis
if err := h.client.EnqueueJob(job); err != nil {
h.logger.Printf("Failed to enqueue new job in Redis: %v", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to enqueue new job: %v", err),
})
}
return c.JSON(job)
}