package heroagent import ( "context" "encoding/json" "fmt" "log" "os" "sync" "time" "git.ourworld.tf/herocode/heroagent/pkg/data/ourdb" ) // JobStatus represents the status of a job type JobStatus string const ( // JobStatusNew indicates a newly created job JobStatusNew JobStatus = "new" // JobStatusActive indicates a job that is currently being processed JobStatusActive JobStatus = "active" // JobStatusError indicates a job that encountered an error JobStatusError JobStatus = "error" // JobStatusDone indicates a job that has been completed successfully JobStatusDone JobStatus = "done" ) // Job represents a job to be processed type Job struct { JobID uint32 `json:"jobid"` Topic string `json:"topic"` Params string `json:"params"` Status JobStatus `json:"status"` TimeScheduled int64 `json:"time_scheduled"` TimeStart int64 `json:"time_start"` TimeEnd int64 `json:"time_end"` Error string `json:"error"` Result string `json:"result"` } // JobManager handles job management between OurDB and Redis type JobManager struct { config JobsConfig ourDB *ourdb.OurDB redisConn *RedisConnection redisMgr *RedisJobManager workers []*JobWorker ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // RedisConnection wraps Redis connection details type RedisConnection struct { TCPPort int UnixSocketPath string } // JobWorker represents a worker that processes jobs type JobWorker struct { id int jobMgr *JobManager ctx context.Context wg *sync.WaitGroup } // NewJobManager creates a new job manager func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) { // Create OurDB directory if it doesn't exist if err := os.MkdirAll(config.OurDBPath, 0755); err != nil { return nil, fmt.Errorf("failed to create OurDB directory: %w", err) } // Initialize OurDB ourDBConfig := ourdb.DefaultConfig() ourDBConfig.Path = config.OurDBPath ourDBConfig.IncrementalMode = true db, err := ourdb.New(ourDBConfig) if err != nil { return nil, fmt.Errorf("failed to create OurDB: %w", err) } // Create context with cancel ctx, cancel := context.WithCancel(context.Background()) // Initialize Redis job manager redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath) if err != nil { // Close OurDB before returning error if closeErr := db.Close(); closeErr != nil { log.Printf("Warning: failed to close OurDB: %v", closeErr) } return nil, fmt.Errorf("failed to create Redis job manager: %w", err) } // Create job manager jobMgr := &JobManager{ config: config, ourDB: db, redisConn: redisConn, redisMgr: redisMgr, ctx: ctx, cancel: cancel, } return jobMgr, nil } // Start starts the job manager func (jm *JobManager) Start() error { log.Println("Starting job manager...") // Start workers jm.workers = make([]*JobWorker, jm.config.WorkerCount) for i := 0; i < jm.config.WorkerCount; i++ { worker := &JobWorker{ id: i, jobMgr: jm, ctx: jm.ctx, wg: &jm.wg, } jm.workers[i] = worker jm.startWorker(worker) } log.Printf("Job manager started with %d workers", jm.config.WorkerCount) return nil } // Stop stops the job manager func (jm *JobManager) Stop() error { log.Println("Stopping job manager...") // Signal all workers to stop jm.cancel() // Wait for all workers to finish jm.wg.Wait() // Close Redis job manager if jm.redisMgr != nil { if err := jm.redisMgr.Close(); err != nil { log.Printf("Warning: failed to close Redis job manager: %v", err) } } // Close OurDB if err := jm.ourDB.Close(); err != nil { return fmt.Errorf("failed to close OurDB: %w", err) } log.Println("Job manager stopped") return nil } // startWorker starts a worker func (jm *JobManager) startWorker(worker *JobWorker) { jm.wg.Add(1) go func() { defer jm.wg.Done() worker.run() }() } // run is the main worker loop func (w *JobWorker) run() { log.Printf("Worker %d started", w.id) ticker := time.NewTicker(w.jobMgr.config.QueuePollInterval) defer ticker.Stop() for { select { case <-w.ctx.Done(): log.Printf("Worker %d stopping", w.id) return case <-ticker.C: // Check for jobs in Redis if err := w.checkForJobs(); err != nil { log.Printf("Worker %d error checking for jobs: %v", w.id, err) } } } } // checkForJobs checks for jobs in Redis func (w *JobWorker) checkForJobs() error { // Get list of queues queues, err := w.jobMgr.redisMgr.ListQueues() if err != nil { return fmt.Errorf("failed to list queues: %w", err) } // Check each queue for jobs for _, topic := range queues { // Try to fetch a job from the queue job, err := w.jobMgr.redisMgr.FetchNextJob(topic) if err != nil { // If queue is empty, continue to next queue continue } // Process the job if err := w.processJob(job); err != nil { log.Printf("Error processing job %d: %v", job.JobID, err) } // Only process one job at a time return nil } return nil } // processJob processes a job func (w *JobWorker) processJob(job *Job) error { log.Printf("Worker %d processing job %d", w.id, job.JobID) // Update job status to active job.Status = JobStatusActive job.TimeStart = time.Now().Unix() // Update job in both OurDB and Redis if err := w.jobMgr.updateJobInBothStores(job); err != nil { return fmt.Errorf("failed to update job status: %w", err) } // Simulate job processing // In a real implementation, this would execute the job based on its parameters time.Sleep(1 * time.Second) // Complete the job job.Status = JobStatusDone job.TimeEnd = time.Now().Unix() job.Result = fmt.Sprintf("Job %d processed successfully", job.JobID) // Update job in OurDB and remove from Redis if err := w.jobMgr.completeJobProcessing(job); err != nil { return fmt.Errorf("failed to complete job: %w", err) } log.Printf("Worker %d completed job %d", w.id, job.JobID) return nil } // CreateJob creates a new job func (jm *JobManager) CreateJob(topic, params string) (*Job, error) { // Create new job job := &Job{ Topic: topic, Params: params, Status: JobStatusNew, TimeScheduled: time.Now().Unix(), } // Store job in OurDB jobData, err := json.Marshal(job) if err != nil { return nil, fmt.Errorf("failed to marshal job: %w", err) } // Add job to OurDB with auto-incremented ID id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{ Data: jobData, }) if err != nil { return nil, fmt.Errorf("failed to store job in OurDB: %w", err) } // Update job with assigned ID job.JobID = id // Store job in Redis if err := jm.redisMgr.EnqueueJob(job); err != nil { return nil, fmt.Errorf("failed to store job in Redis: %w", err) } log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID) return job, nil } // GetJob retrieves a job by ID func (jm *JobManager) GetJob(jobID uint32) (*Job, error) { // Get job from OurDB jobData, err := jm.ourDB.Get(jobID) if err != nil { return nil, fmt.Errorf("failed to get job from OurDB: %w", err) } // Parse job data job := &Job{} if err := json.Unmarshal(jobData, job); err != nil { return nil, fmt.Errorf("failed to unmarshal job data: %w", err) } return job, nil } // UpdateJobStatus updates the status of a job func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error { // Get job from OurDB job, err := jm.GetJob(jobID) if err != nil { return err } // Update status job.Status = status // Update timestamps based on status now := time.Now().Unix() if status == JobStatusActive && job.TimeStart == 0 { job.TimeStart = now } else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 { job.TimeEnd = now } // Store updated job in OurDB jobData, err := json.Marshal(job) if err != nil { return fmt.Errorf("failed to marshal job: %w", err) } // Update job in OurDB _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ ID: &jobID, Data: jobData, }) if err != nil { return fmt.Errorf("failed to update job in OurDB: %w", err) } // If job is done or has error, remove from Redis if status == JobStatusDone || status == JobStatusError { if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) } } else { // Otherwise, update in Redis if err := jm.redisMgr.UpdateJobStatus(job); err != nil { log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err) } } return nil } // CompleteJob marks a job as completed func (jm *JobManager) CompleteJob(jobID uint32, result string) error { // Get job from OurDB job, err := jm.GetJob(jobID) if err != nil { return err } // Update job job.Status = JobStatusDone job.TimeEnd = time.Now().Unix() job.Result = result // Store updated job in OurDB jobData, err := json.Marshal(job) if err != nil { return fmt.Errorf("failed to marshal job: %w", err) } // Update job in OurDB _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ ID: &jobID, Data: jobData, }) if err != nil { return fmt.Errorf("failed to update job in OurDB: %w", err) } // Remove from Redis if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) } log.Printf("Job %d completed and removed from Redis", jobID) return nil } // FailJob marks a job as failed func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error { // Get job from OurDB job, err := jm.GetJob(jobID) if err != nil { return err } // Update job job.Status = JobStatusError job.TimeEnd = time.Now().Unix() job.Error = errorMsg // Store updated job in OurDB jobData, err := json.Marshal(job) if err != nil { return fmt.Errorf("failed to marshal job: %w", err) } // Update job in OurDB _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ ID: &jobID, Data: jobData, }) if err != nil { return fmt.Errorf("failed to update job in OurDB: %w", err) } // Remove from Redis if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) } log.Printf("Job %d failed and removed from Redis", jobID) return nil } // updateJobInBothStores updates a job in both OurDB and Redis func (jm *JobManager) updateJobInBothStores(job *Job) error { // Store job in OurDB jobData, err := json.Marshal(job) if err != nil { return fmt.Errorf("failed to marshal job: %w", err) } // Update job in OurDB _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ ID: &job.JobID, Data: jobData, }) if err != nil { return fmt.Errorf("failed to update job in OurDB: %w", err) } // Update job in Redis if err := jm.redisMgr.UpdateJobStatus(job); err != nil { return fmt.Errorf("failed to update job in Redis: %w", err) } return nil } // completeJobProcessing updates a completed job in OurDB and removes it from Redis func (jm *JobManager) completeJobProcessing(job *Job) error { // Store job in OurDB jobData, err := json.Marshal(job) if err != nil { return fmt.Errorf("failed to marshal job: %w", err) } // Update job in OurDB _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ ID: &job.JobID, Data: jobData, }) if err != nil { return fmt.Errorf("failed to update job in OurDB: %w", err) } // Remove from Redis if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil { return fmt.Errorf("failed to remove job from Redis: %w", err) } return nil }