This commit is contained in:
2025-04-23 04:18:28 +02:00
parent 10a7d9bb6b
commit a16ac8f627
276 changed files with 85166 additions and 1 deletions

179
pkg/jobsmanager/README.md Normal file
View File

@@ -0,0 +1,179 @@
# HeroJobs Package
The HeroJobs package provides a robust job queue and processing system for the HeroLauncher project. It allows for asynchronous execution of scripts and API calls through a persistence and queue system.
## Overview
HeroJobs uses a dual storage system combining Redis and OurDB. Redis handles live queue management and provides fast access to running jobs, while OurDB offers persistent storage with auto-incrementing IDs. Jobs can be submitted to topic-specific queues within a circle, processed by a watchdog, and their results can be retrieved asynchronously.
## Components
### Job
A `Job` represents a unit of work to be processed. Each job has:
- **JobID**: Unique identifier for the job (auto-incremented by OurDB)
- **SessionKey**: Session identifier for authentication
- **CircleID**: Circle identifier for organization
- **Topic**: Topic for categorization
- **Params**: The parameters to be executed (script or API call)
- **ParamsType**: Type of parameters (HeroScript, RhaiScript, OpenRPC, AI)
- **Status**: Current status (new, active, error, done)
- **Timeout**: Maximum execution time in seconds
- **TimeScheduled/TimeStart/TimeEnd**: Timestamps for job lifecycle
- **Error/Result**: Error message or result data
- **Log**: Whether to enable logging for this job
### Storage
#### Dual Storage System
Jobs are stored in both Redis and OurDB:
#### Redis
- Handles all queue operations (adding/removing jobs)
- Stores all running jobs for fast access
- Used for real-time operations and status updates
- **Job Storage**: `herojobs:<circleID>:<topic>:<jobID>` or legacy `jobsmanager:<jobID>`
- **Queue**: `heroqueue:<circleID>:<topic>`
#### OurDB
- Provides persistent storage with auto-incrementing IDs
- Stores job details in a file-based database
- Path: `~/hero/var/circles/<circleid>/<topic>/jobsdb`
- Used for long-term storage and job history
### WatchDog
The `WatchDog` processes jobs from queues in the background:
- Runs as a goroutine that continuously checks all Redis queues
- When a job is found, it's processed in a separate goroutine
- Monitors execution time and enforces timeouts
- Updates job status and results in both OurDB and Redis
- Routes jobs to appropriate processors based on ParamsType (HeroScript, RhaiScript, OpenRPC, AI)
## Job Processing Flow
1. **Job Creation**: A job is created with parameters and saved to OurDB to get a unique ID
2. **Enqueuing**: The job is stored in Redis and its ID is added to its Redis queue
3. **Processing**: The watchdog fetches the job ID from the Redis queue and loads the job
4. **Execution**: The job is processed based on its ParamsType
5. **Concurrent Updates**: The job status and result are updated in both OurDB and Redis during processing
6. **Completion**: Final status and results are stored in both systems
## Timeout Handling
The watchdog implements timeout handling to prevent jobs from running indefinitely:
- Each job has a configurable timeout (default: 60 seconds)
- If a job exceeds its timeout, it's terminated and marked as error
- Timeouts are enforced using Go's context package
## Concurrency Management
The watchdog uses Go's concurrency primitives to safely manage multiple jobs:
- Each job is processed in a separate goroutine
- A wait group tracks all running goroutines
- A mutex protects access to shared data structures
- Context cancellation provides clean shutdown
## Usage Examples
### Starting the WatchDog
```go
// Initialize Redis client
redisClient, err := jobsmanager.NewRedisClient("localhost:6379", false)
if err != nil {
log.Fatalf("Failed to connect to Redis: %v", err)
}
defer redisClient.Close()
// Create and start watchdog
watchdog := jobsmanager.NewWatchDog(redisClient)
watchdog.Start()
// Handle shutdown
defer watchdog.Stop()
```
### Submitting a Job
```go
// Create a new job
job := jobsmanager.NewJob()
job.CircleID = "myCircle"
job.Topic = "myTopic"
job.Params = `
!!fake.return_success
message: "This is a test job"
`
job.ParamsType = jobsmanager.ParamsTypeHeroScript
job.Timeout = 30 // 30 seconds timeout
// Save the job to OurDB to get an ID
if err := job.Save(); err != nil {
log.Printf("Failed to save job: %v", err)
return
}
// Store and enqueue the job in Redis
if err := redisClient.StoreJob(job); err != nil {
log.Printf("Failed to store job in Redis: %v", err)
return
}
if err := redisClient.EnqueueJob(job); err != nil {
log.Printf("Failed to enqueue job: %v", err)
return
}
// Remember the job ID for later retrieval
jobID := job.JobID
```
### Checking Job Status
```go
// For currently running jobs, use Redis for fastest access
job, err := redisClient.GetJob(jobID)
if err != nil {
// If not found in Redis, try OurDB for historical jobs
job = &jobsmanager.Job{JobID: jobID}
if err := job.Load(); err != nil {
log.Printf("Failed to load job: %v", err)
return
}
}
// Check job status
switch job.Status {
case jobsmanager.JobStatusNew:
fmt.Println("Job is waiting to be processed")
case jobsmanager.JobStatusActive:
fmt.Println("Job is currently being processed")
case jobsmanager.JobStatusDone:
fmt.Printf("Job completed successfully: %s\n", job.Result)
case jobsmanager.JobStatusError:
fmt.Printf("Job failed: %s\n", job.Error)
}
```
## Processing Implementation
The watchdog supports different types of job parameters processing:
### HeroScript Processing
Jobs with ParamsType = ParamsTypeHeroScript are processed by passing the script to a HeroScript handler.
### RhaiScript Processing
Jobs with ParamsType = ParamsTypeRhaiScript are processed by the Rhai scripting engine.
### OpenRPC Processing
Jobs with ParamsType = ParamsTypeOpenRPC are processed by passing the parameters to an OpenRPC handler.
### AI Processing
Jobs with ParamsType = ParamsTypeAI are processed by an AI system (experimental).
This modular design allows for extensible script and API call processing through appropriate handlers.

242
pkg/jobsmanager/job.go Normal file
View File

@@ -0,0 +1,242 @@
package jobsmanager
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/freeflowuniverse/heroagent/pkg/data/ourdb"
"github.com/freeflowuniverse/heroagent/pkg/tools"
)
// JobStatus represents the status of a job
type JobStatus string
const (
// JobStatusNew indicates a newly created job
JobStatusNew JobStatus = "new"
// JobStatusActive indicates a job that is currently being processed
JobStatusActive JobStatus = "active"
// JobStatusError indicates a job that encountered an error
JobStatusError JobStatus = "error"
// JobStatusDone indicates a job that has been completed successfully
JobStatusDone JobStatus = "done"
)
// ParamsType represents the type of parameters for a job
type ParamsType string
const (
// ParamsTypeHeroScript indicates parameters in HeroScript format
ParamsTypeHeroScript ParamsType = "heroscript"
// ParamsTypeRhaiScript indicates parameters in RhaiScript format
ParamsTypeRhaiScript ParamsType = "rhaiscript"
// ParamsTypeOpenRPC indicates parameters in OpenRPC format
ParamsTypeOpenRPC ParamsType = "openrpc"
ParamsTypeAI ParamsType = "ai"
)
// Job represents a job to be processed
type Job struct {
JobID uint32 `json:"jobid"`
SessionKey string `json:"sessionkey"`
CircleID string `json:"circleid"`
Topic string `json:"topic"`
Params string `json:"params"` //can be a script, rpc, etc.
ParamsType ParamsType `json:"params_type"` // Type of params: heroscript, rhaiscript, openrpc
Timeout int64 `json:"timeout"`
Status JobStatus `json:"status"`
TimeScheduled int64 `json:"time_scheduled"`
TimeStart int64 `json:"time_start"`
TimeEnd int64 `json:"time_end"`
Error string `json:"error"`
Result string `json:"result"`
Log bool `json:"log"` // Whether to enable logging for this job
// OurDB client for job storage
dbClient *ourdb.Client
}
// NewJob creates a new job with default values
func NewJob() *Job {
now := time.Now().Unix()
return &Job{
JobID: 0, // Default to 0, will be auto-incremented by OurDB
Topic: "default",
Status: JobStatusNew,
ParamsType: ParamsTypeHeroScript, // Default to HeroScript
TimeScheduled: now,
}
}
// NewJobFromJSON creates a new job from a JSON string
func NewJobFromJSON(jsonStr string) (*Job, error) {
job := &Job{}
err := json.Unmarshal([]byte(jsonStr), job)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal job: %w", err)
}
// Set default values if not provided
if job.JobID == 0 {
job.JobID = 0 // Default to 0, will be auto-incremented by OurDB
}
if job.Topic == "" {
job.Topic = "default"
}
if job.Status == "" {
job.Status = JobStatusNew
}
if job.ParamsType == "" {
job.ParamsType = ParamsTypeOpenRPC
}
if job.TimeScheduled == 0 {
job.TimeScheduled = time.Now().Unix()
}
return job, nil
}
// ToJSON converts the job to a JSON string
func (j *Job) ToJSON() (string, error) {
bytes, err := json.Marshal(j)
if err != nil {
return "", fmt.Errorf("failed to marshal job: %w", err)
}
return string(bytes), nil
}
// QueueKey returns the Redis queue key for this job
func (j *Job) QueueKey() string {
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(j.CircleID)
fixedTopic := tools.NameFix(j.Topic)
return fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
}
// StorageKey returns the Redis storage key for this job
func (j *Job) StorageKey() string {
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(j.CircleID)
fixedTopic := tools.NameFix(j.Topic)
return fmt.Sprintf("herojobs:%s:%s:%d", fixedCircleID, fixedTopic, j.JobID)
}
// initOurDbClient initializes the OurDB client connection for the job
func (j *Job) initOurDbClient() error {
if j.dbClient != nil {
return nil
}
homeDir, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to get user home directory: %w", err)
}
// Create path as ~/hero/var/circles/$circleid/$topic/jobsdb
dbPath := filepath.Join(homeDir, "hero", "var", "circles", j.CircleID, j.Topic, "jobsdb")
// Ensure directory exists
if err := os.MkdirAll(dbPath, 0755); err != nil {
return fmt.Errorf("failed to create database directory: %w", err)
}
// Initialize OurDB client
client, err := ourdb.NewClient(dbPath)
if err != nil {
return fmt.Errorf("failed to create OurDB client: %w", err)
}
j.dbClient = client
return nil
}
// Save stores the job in OurDB using auto-incrementing JobID
func (j *Job) Save() error {
// Ensure OurDB client is initialized
if err := j.initOurDbClient(); err != nil {
return err
}
// Convert job to JSON
jobData, err := json.Marshal(j)
if err != nil {
return fmt.Errorf("failed to marshal job to JSON: %w", err)
}
// If JobID is 0, let OurDB assign an auto-incremented ID
if j.JobID == 0 {
// Use OurDB Add method to auto-generate ID
id, err := j.dbClient.Add(jobData)
if err != nil {
return fmt.Errorf("failed to add job to OurDB: %w", err)
}
j.JobID = id
} else {
// Save to OurDB with specified ID
if err := j.dbClient.Set(j.JobID, jobData); err != nil {
return fmt.Errorf("failed to save job to OurDB: %w", err)
}
}
return nil
}
// Load retrieves the job from OurDB using the JobID as the key
func (j *Job) Load() error {
// Ensure OurDB client is initialized
if err := j.initOurDbClient(); err != nil {
return err
}
// Validate that JobID is not zero
if j.JobID == 0 {
return fmt.Errorf("cannot load job with ID 0: no ID specified")
}
// Get from OurDB using the numeric JobID directly
jobData, err := j.dbClient.Get(j.JobID)
if err != nil {
return fmt.Errorf("failed to load job from OurDB: %w", err)
}
// Parse job data
tempJob := &Job{}
if err := json.Unmarshal(jobData, tempJob); err != nil {
return fmt.Errorf("failed to unmarshal job data: %w", err)
}
// Copy values to current job (except the dbClient)
dbClient := j.dbClient
*j = *tempJob
j.dbClient = dbClient
return nil
}
// Finish completes the job, updating all properties and saving to OurDB
func (j *Job) Finish(status JobStatus, result string, err error) error {
// Update job properties
j.Status = status
j.TimeEnd = time.Now().Unix()
j.Result = result
if err != nil {
j.Error = err.Error()
}
// Save updated job to OurDB
return j.Save()
}
// Close closes the OurDB client connection
func (j *Job) Close() error {
if j.dbClient != nil {
err := j.dbClient.Close()
j.dbClient = nil
return err
}
return nil
}

View File

@@ -0,0 +1,148 @@
package jobsmanager
import (
"context"
"fmt"
"log"
"time"
)
// processJob processes a job
func (d *WatchDog) processJob(job *Job) {
// First, initialize OurDB connection and save the job
if err := job.initOurDbClient(); err != nil {
log.Printf("Error initializing OurDB client: %v", err)
} else {
// Set job status to active and save it to OurDB
job.Status = JobStatusActive
job.TimeStart = time.Now().Unix()
if err := job.Save(); err != nil {
log.Printf("Error saving job to OurDB: %v", err)
}
// Make sure to close the DB connection when we're done
defer job.Close()
}
// Also update Redis for backward compatibility
err := d.redisClient.UpdateJobStatus(job.JobID, JobStatusActive)
if err != nil {
log.Printf("Error updating job status in Redis: %v", err)
}
// Create a context with timeout
timeout := time.Duration(job.Timeout) * time.Second
if timeout == 0 {
timeout = 60 * time.Second // Default timeout: 60 seconds
}
ctx, cancel := context.WithTimeout(d.ctx, timeout)
defer cancel()
// Process job based on parameters type
switch job.ParamsType {
case ParamsTypeHeroScript:
d.processHeroScript(ctx, job)
case ParamsTypeRhaiScript:
d.processRhaiScript(ctx, job)
case ParamsTypeOpenRPC:
d.processOpenRPC(ctx, job)
case ParamsTypeAI:
d.processAI(ctx, job)
default:
// Unknown parameters type
errMsg := fmt.Sprintf("Unknown parameters type: %s", job.ParamsType)
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
// Update job status to error in both OurDB and Redis
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
log.Printf("Error updating job error in Redis: %v", err)
}
}
}
// processHeroScript processes a job with HeroScript parameters
func (d *WatchDog) processHeroScript(ctx context.Context, job *Job) {
// Start a goroutine to process the job
d.wg.Add(1)
go func() {
defer d.wg.Done()
// TODO: Implement HeroScript processing
errMsg := "HeroScript processing not implemented yet"
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
// Update job status in OurDB
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
// Also update Redis for backward compatibility
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
log.Printf("Error updating job error in Redis: %v", err)
}
}()
}
// processRhaiScript processes a job with RhaiScript parameters
func (d *WatchDog) processRhaiScript(ctx context.Context, job *Job) {
// Start a goroutine to process the job
d.wg.Add(1)
go func() {
defer d.wg.Done()
// TODO: Implement RhaiScript processing
errMsg := "RhaiScript processing not implemented yet"
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
// Update job status in OurDB
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
// Also update Redis for backward compatibility
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
log.Printf("Error updating job error in Redis: %v", err)
}
}()
}
// processOpenRPC processes a job with OpenRPC parameters
func (d *WatchDog) processOpenRPC(ctx context.Context, job *Job) {
// Start a goroutine to process the job
d.wg.Add(1)
go func() {
defer d.wg.Done()
// TODO: Implement OpenRPC processing
errMsg := "OpenRPC processing not implemented yet"
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
// Update job status in OurDB
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
// Also update Redis for backward compatibility
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
log.Printf("Error updating job error in Redis: %v", err)
}
}()
}
// processAI processes a job with AI parameters
func (d *WatchDog) processAI(ctx context.Context, job *Job) {
// Start a goroutine to process the job
d.wg.Add(1)
go func() {
defer d.wg.Done()
// TODO: Implement AI processing
errMsg := "AI processing not implemented yet"
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
// Update job status in OurDB
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
// Also update Redis for backward compatibility
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
log.Printf("Error updating job error in Redis: %v", err)
}
}()
}

402
pkg/jobsmanager/redis.go Normal file
View File

@@ -0,0 +1,402 @@
package jobsmanager
import (
"context"
"fmt"
"strings"
"time"
"github.com/freeflowuniverse/heroagent/pkg/tools"
"github.com/redis/go-redis/v9"
)
// RedisClient handles Redis operations for jobs
type RedisClient struct {
client *redis.Client
ctx context.Context
}
// NewRedisClient creates a new Redis client
func NewRedisClient(addr string, isUnixSocket bool) (*RedisClient, error) {
// Determine network type
networkType := "tcp"
if isUnixSocket {
networkType = "unix"
}
// Create Redis client
client := redis.NewClient(&redis.Options{
Network: networkType,
Addr: addr,
DB: 0,
DialTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
})
// Test connection
ctx := context.Background()
_, err := client.Ping(ctx).Result()
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
return &RedisClient{
client: client,
ctx: ctx,
}, nil
}
// Close closes the Redis client
func (r *RedisClient) Close() error {
return r.client.Close()
}
// StoreJob stores a job in Redis
func (r *RedisClient) StoreJob(job *Job) error {
// Convert job to JSON
jobJSON, err := job.ToJSON()
if err != nil {
return err
}
// Store job in Redis
err = r.client.HSet(r.ctx, job.StorageKey(), "job", jobJSON).Err()
if err != nil {
return fmt.Errorf("failed to store job: %w", err)
}
return nil
}
// GetJob retrieves a job from Redis by ID
func (r *RedisClient) GetJob(jobID interface{}) (*Job, error) {
// For interface{} type, we need more information to construct the storage key
// Two approaches:
// 1. For numeric JobID only (no circle or topic), use legacy format
// 2. For full key with pattern "circleID:topic:jobID", parse out components
var storageKey string
// Handle different types of jobID
switch id := jobID.(type) {
case uint32:
// Legacy format for backward compatibility
storageKey = fmt.Sprintf("jobsmanager:%d", id)
case string:
// Check if this is a composite key (circleID:topic:jobID)
parts := strings.Split(id, ":")
if len(parts) == 3 {
// This is a composite key
circleID := tools.NameFix(parts[0])
topic := tools.NameFix(parts[1])
// Try to convert last part to uint32
var numericID uint32
if _, err := fmt.Sscanf(parts[2], "%d", &numericID); err == nil {
storageKey = fmt.Sprintf("herojobs:%s:%s:%d", circleID, topic, numericID)
} else {
// Legacy string ID format in composite key
storageKey = fmt.Sprintf("herojobs:%s:%s:%s", circleID, topic, parts[2])
}
} else {
// Try to convert string to uint32 (legacy format)
var numericID uint32
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
storageKey = fmt.Sprintf("jobsmanager:%d", numericID)
} else {
// Legacy string ID format
storageKey = fmt.Sprintf("jobsmanager:%s", id)
}
}
default:
return nil, fmt.Errorf("unsupported job ID type: %T", jobID)
}
// Get job from Redis
jobJSON, err := r.client.HGet(r.ctx, storageKey, "job").Result()
if err != nil {
if err == redis.Nil {
return nil, fmt.Errorf("job not found: %v", jobID)
}
return nil, fmt.Errorf("failed to get job: %w", err)
}
// Parse job JSON
job, err := NewJobFromJSON(jobJSON)
if err != nil {
return nil, err
}
return job, nil
}
// DeleteJob deletes a job from Redis
func (r *RedisClient) DeleteJob(jobID interface{}) error {
// Handle different types of jobID similar to GetJob
var storageKey string
switch id := jobID.(type) {
case uint32:
// Legacy format for backward compatibility
storageKey = fmt.Sprintf("jobsmanager:%d", id)
case string:
// Check if this is a composite key (circleID:topic:jobID)
parts := strings.Split(id, ":")
if len(parts) == 3 {
// This is a composite key
circleID := tools.NameFix(parts[0])
topic := tools.NameFix(parts[1])
// Try to convert last part to uint32
var numericID uint32
if _, err := fmt.Sscanf(parts[2], "%d", &numericID); err == nil {
storageKey = fmt.Sprintf("herojobs:%s:%s:%d", circleID, topic, numericID)
} else {
// Legacy string ID format in composite key
storageKey = fmt.Sprintf("herojobs:%s:%s:%s", circleID, topic, parts[2])
}
} else {
// Try to convert string to uint32 (legacy format)
var numericID uint32
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
storageKey = fmt.Sprintf("jobsmanager:%d", numericID)
} else {
// Legacy string ID format
storageKey = fmt.Sprintf("jobsmanager:%s", id)
}
}
default:
return fmt.Errorf("unsupported job ID type: %T", jobID)
}
// Delete job from Redis
err := r.client.Del(r.ctx, storageKey).Err()
if err != nil {
return fmt.Errorf("failed to delete job: %w", err)
}
return nil
}
// EnqueueJob adds a job to its queue
func (r *RedisClient) EnqueueJob(job *Job) error {
// Store the job first
err := r.StoreJob(job)
if err != nil {
return err
}
// Add job ID to queue
err = r.client.RPush(r.ctx, job.QueueKey(), job.JobID).Err()
if err != nil {
return fmt.Errorf("failed to enqueue job: %w", err)
}
return nil
}
// QueueSize returns the size of a queue
func (r *RedisClient) QueueSize(circleID, topic string) (int64, error) {
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(circleID)
fixedTopic := tools.NameFix(topic)
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
// Get queue size
size, err := r.client.LLen(r.ctx, queueKey).Result()
if err != nil {
return 0, fmt.Errorf("failed to get queue size: %w", err)
}
return size, nil
}
// QueueEmpty empties a queue and deletes all corresponding jobs
func (r *RedisClient) QueueEmpty(circleID, topic string) error {
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(circleID)
fixedTopic := tools.NameFix(topic)
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
// Get all job IDs from queue
jobIDs, err := r.client.LRange(r.ctx, queueKey, 0, -1).Result()
if err != nil {
return fmt.Errorf("failed to get job IDs from queue: %w", err)
}
// Delete all jobs
for _, jobIDStr := range jobIDs {
// Convert string ID to uint32 if possible
var jobID uint32
if _, err := fmt.Sscanf(jobIDStr, "%d", &jobID); err == nil {
// New format with CircleID and Topic
storageKey := fmt.Sprintf("herojobs:%s:%s:%d", fixedCircleID, fixedTopic, jobID)
err := r.client.Del(r.ctx, storageKey).Err()
if err != nil {
return fmt.Errorf("failed to delete job %d: %w", jobID, err)
}
} else {
// Handle legacy string IDs
storageKey := fmt.Sprintf("jobsmanager:%s", jobIDStr)
err := r.client.Del(r.ctx, storageKey).Err()
if err != nil {
return fmt.Errorf("failed to delete job %s: %w", jobIDStr, err)
}
}
}
// Empty queue
err = r.client.Del(r.ctx, queueKey).Err()
if err != nil {
return fmt.Errorf("failed to empty queue: %w", err)
}
return nil
}
// QueueGet gets the last job from a queue without removing it
func (r *RedisClient) QueueGet(circleID, topic string) (*Job, error) {
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(circleID)
fixedTopic := tools.NameFix(topic)
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
// Get last job ID from queue
jobID, err := r.client.LIndex(r.ctx, queueKey, -1).Result()
if err != nil {
if err == redis.Nil {
return nil, fmt.Errorf("queue is empty")
}
return nil, fmt.Errorf("failed to get job ID from queue: %w", err)
}
// Get job with context about circle and topic
compositeID := fmt.Sprintf("%s:%s:%s", circleID, topic, jobID)
return r.GetJob(compositeID)
}
// QueueFetch gets and removes the last job from a queue
func (r *RedisClient) QueueFetch(circleID, topic string) (*Job, error) {
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(circleID)
fixedTopic := tools.NameFix(topic)
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
// Get and remove last job ID from queue
jobID, err := r.client.RPop(r.ctx, queueKey).Result()
if err != nil {
if err == redis.Nil {
return nil, fmt.Errorf("queue is empty")
}
return nil, fmt.Errorf("failed to fetch job ID from queue: %w", err)
}
// Get job with context about circle and topic
compositeID := fmt.Sprintf("%s:%s:%s", circleID, topic, jobID)
return r.GetJob(compositeID)
}
// ListJobs lists job IDs by circle and topic
func (r *RedisClient) ListJobs(circleID, topic string) ([]uint32, error) {
var pattern string
// Apply name fixing to CircleID and Topic
fixedCircleID := tools.NameFix(circleID)
fixedTopic := tools.NameFix(topic)
if circleID != "" && topic != "" {
pattern = fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
} else if circleID != "" {
pattern = fmt.Sprintf("heroqueue:%s:*", fixedCircleID)
} else if topic != "" {
pattern = fmt.Sprintf("heroqueue:*:%s", fixedTopic)
} else {
pattern = "heroqueue:*:*"
}
// Get all matching queue keys
queueKeys, err := r.client.Keys(r.ctx, pattern).Result()
if err != nil {
return nil, fmt.Errorf("failed to list queues: %w", err)
}
var jobIDs []uint32
for _, queueKey := range queueKeys {
// Get all job IDs from queue
stringIDs, err := r.client.LRange(r.ctx, queueKey, 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("failed to get job IDs from queue %s: %w", queueKey, err)
}
// Convert string IDs to uint32
for _, idStr := range stringIDs {
var id uint32
if _, err := fmt.Sscanf(idStr, "%d", &id); err == nil {
jobIDs = append(jobIDs, id)
} else {
// Log but continue - this handles legacy string IDs that can't be converted
fmt.Printf("Warning: Found job ID that couldn't be converted to uint32: %s\n", idStr)
}
}
}
return jobIDs, nil
}
// UpdateJobStatus updates the status of a job
func (r *RedisClient) UpdateJobStatus(jobID uint32, status JobStatus) error {
// Get job
job, err := r.GetJob(jobID)
if err != nil {
return err
}
// Update status
job.Status = status
// Update timestamps based on status
now := time.Now().Unix()
if status == JobStatusActive && job.TimeStart == 0 {
job.TimeStart = now
} else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 {
job.TimeEnd = now
}
// Store updated job
return r.StoreJob(job)
}
// UpdateJobResult updates the result of a job
func (r *RedisClient) UpdateJobResult(jobID uint32, result string) error {
// Get job
job, err := r.GetJob(jobID)
if err != nil {
return err
}
// Update result
job.Result = result
job.Status = JobStatusDone
job.TimeEnd = time.Now().Unix()
// Store updated job
return r.StoreJob(job)
}
// UpdateJobError updates the error of a job
func (r *RedisClient) UpdateJobError(jobID uint32, errorMsg string) error {
// Get job
job, err := r.GetJob(jobID)
if err != nil {
return err
}
// Update error
job.Error = errorMsg
job.Status = JobStatusError
job.TimeEnd = time.Now().Unix()
// Store updated job
return r.StoreJob(job)
}

115
pkg/jobsmanager/watchdog.go Normal file
View File

@@ -0,0 +1,115 @@
package jobsmanager
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
)
// JobProcessor is a function type that processes a job
// WatchDog handles the processing of jobs from queues
type WatchDog struct {
redisClient *RedisClient
processorMutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewWatchDog creates a new watchdog
func NewWatchDog(redisClient *RedisClient) *WatchDog {
ctx, cancel := context.WithCancel(context.Background())
return &WatchDog{
redisClient: redisClient,
ctx: ctx,
cancel: cancel,
}
}
// Start starts the watchdog
func (d *WatchDog) Start() {
d.wg.Add(1)
go d.processQueues()
}
// Stop stops the watchdog
func (d *WatchDog) Stop() {
d.cancel()
d.wg.Wait()
}
// processQueues processes all queues
func (d *WatchDog) processQueues() {
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
default:
// Get all queues
queues, err := d.getQueues()
if err != nil {
log.Printf("Error getting queues: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
// Process each queue
processed := false
for _, queue := range queues {
circleID, topic := queue.circleID, queue.topic
job, err := d.redisClient.QueueFetch(circleID, topic)
if err != nil {
// Queue is empty, continue to next queue
continue
}
// Process the job
processed = true
d.processJob(job)
}
// If no jobs were processed, wait before checking again
if !processed {
time.Sleep(100 * time.Millisecond)
}
}
}
}
// queueInfo represents a queue
type queueInfo struct {
circleID string
topic string
}
// getQueues returns all queues
func (d *WatchDog) getQueues() ([]queueInfo, error) {
// Get all queue keys from Redis
queueKeys, err := d.redisClient.client.Keys(d.redisClient.ctx, "heroqueue:*:*").Result()
if err != nil {
return nil, fmt.Errorf("failed to list queues: %w", err)
}
var queues []queueInfo
for _, queueKey := range queueKeys {
// Parse queue key (format: heroqueue:<circleID>:<topic>)
// Split the key by ":"
parts := strings.Split(queueKey, ":")
if len(parts) != 3 {
log.Printf("Invalid queue key format: %s", queueKey)
continue
}
queues = append(queues, queueInfo{
circleID: parts[1],
topic: parts[2],
})
}
return queues, nil
}