...
This commit is contained in:
179
pkg/herojobs/README.md
Normal file
179
pkg/herojobs/README.md
Normal file
@@ -0,0 +1,179 @@
|
||||
# HeroJobs Package
|
||||
|
||||
The HeroJobs package provides a robust job queue and processing system for the HeroLauncher project. It allows for asynchronous execution of scripts and API calls through a persistence and queue system.
|
||||
|
||||
## Overview
|
||||
|
||||
HeroJobs uses a dual storage system combining Redis and OurDB. Redis handles live queue management and provides fast access to running jobs, while OurDB offers persistent storage with auto-incrementing IDs. Jobs can be submitted to topic-specific queues within a circle, processed by a watchdog, and their results can be retrieved asynchronously.
|
||||
|
||||
## Components
|
||||
|
||||
### Job
|
||||
|
||||
A `Job` represents a unit of work to be processed. Each job has:
|
||||
|
||||
- **JobID**: Unique identifier for the job (auto-incremented by OurDB)
|
||||
- **SessionKey**: Session identifier for authentication
|
||||
- **CircleID**: Circle identifier for organization
|
||||
- **Topic**: Topic for categorization
|
||||
- **Params**: The parameters to be executed (script or API call)
|
||||
- **ParamsType**: Type of parameters (HeroScript, RhaiScript, OpenRPC, AI)
|
||||
- **Status**: Current status (new, active, error, done)
|
||||
- **Timeout**: Maximum execution time in seconds
|
||||
- **TimeScheduled/TimeStart/TimeEnd**: Timestamps for job lifecycle
|
||||
- **Error/Result**: Error message or result data
|
||||
- **Log**: Whether to enable logging for this job
|
||||
|
||||
### Storage
|
||||
|
||||
#### Dual Storage System
|
||||
|
||||
Jobs are stored in both Redis and OurDB:
|
||||
|
||||
#### Redis
|
||||
- Handles all queue operations (adding/removing jobs)
|
||||
- Stores all running jobs for fast access
|
||||
- Used for real-time operations and status updates
|
||||
- **Job Storage**: `herojobs:<circleID>:<topic>:<jobID>` or legacy `herojobs:<jobID>`
|
||||
- **Queue**: `heroqueue:<circleID>:<topic>`
|
||||
|
||||
#### OurDB
|
||||
- Provides persistent storage with auto-incrementing IDs
|
||||
- Stores job details in a file-based database
|
||||
- Path: `~/hero/var/circles/<circleid>/<topic>/jobsdb`
|
||||
- Used for long-term storage and job history
|
||||
|
||||
### WatchDog
|
||||
|
||||
The `WatchDog` processes jobs from queues in the background:
|
||||
|
||||
- Runs as a goroutine that continuously checks all Redis queues
|
||||
- When a job is found, it's processed in a separate goroutine
|
||||
- Monitors execution time and enforces timeouts
|
||||
- Updates job status and results in both OurDB and Redis
|
||||
- Routes jobs to appropriate processors based on ParamsType (HeroScript, RhaiScript, OpenRPC, AI)
|
||||
|
||||
## Job Processing Flow
|
||||
|
||||
1. **Job Creation**: A job is created with parameters and saved to OurDB to get a unique ID
|
||||
2. **Enqueuing**: The job is stored in Redis and its ID is added to its Redis queue
|
||||
3. **Processing**: The watchdog fetches the job ID from the Redis queue and loads the job
|
||||
4. **Execution**: The job is processed based on its ParamsType
|
||||
5. **Concurrent Updates**: The job status and result are updated in both OurDB and Redis during processing
|
||||
6. **Completion**: Final status and results are stored in both systems
|
||||
|
||||
## Timeout Handling
|
||||
|
||||
The watchdog implements timeout handling to prevent jobs from running indefinitely:
|
||||
|
||||
- Each job has a configurable timeout (default: 60 seconds)
|
||||
- If a job exceeds its timeout, it's terminated and marked as error
|
||||
- Timeouts are enforced using Go's context package
|
||||
|
||||
## Concurrency Management
|
||||
|
||||
The watchdog uses Go's concurrency primitives to safely manage multiple jobs:
|
||||
|
||||
- Each job is processed in a separate goroutine
|
||||
- A wait group tracks all running goroutines
|
||||
- A mutex protects access to shared data structures
|
||||
- Context cancellation provides clean shutdown
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Starting the WatchDog
|
||||
|
||||
```go
|
||||
// Initialize Redis client
|
||||
redisClient, err := herojobs.NewRedisClient("localhost:6379", false)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to Redis: %v", err)
|
||||
}
|
||||
defer redisClient.Close()
|
||||
|
||||
// Create and start watchdog
|
||||
watchdog := herojobs.NewWatchDog(redisClient)
|
||||
watchdog.Start()
|
||||
|
||||
// Handle shutdown
|
||||
defer watchdog.Stop()
|
||||
```
|
||||
|
||||
### Submitting a Job
|
||||
|
||||
```go
|
||||
// Create a new job
|
||||
job := herojobs.NewJob()
|
||||
job.CircleID = "myCircle"
|
||||
job.Topic = "myTopic"
|
||||
job.Params = `
|
||||
!!fake.return_success
|
||||
message: "This is a test job"
|
||||
`
|
||||
job.ParamsType = herojobs.ParamsTypeHeroScript
|
||||
job.Timeout = 30 // 30 seconds timeout
|
||||
|
||||
// Save the job to OurDB to get an ID
|
||||
if err := job.Save(); err != nil {
|
||||
log.Printf("Failed to save job: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Store and enqueue the job in Redis
|
||||
if err := redisClient.StoreJob(job); err != nil {
|
||||
log.Printf("Failed to store job in Redis: %v", err)
|
||||
return
|
||||
}
|
||||
if err := redisClient.EnqueueJob(job); err != nil {
|
||||
log.Printf("Failed to enqueue job: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Remember the job ID for later retrieval
|
||||
jobID := job.JobID
|
||||
```
|
||||
|
||||
### Checking Job Status
|
||||
|
||||
```go
|
||||
// For currently running jobs, use Redis for fastest access
|
||||
job, err := redisClient.GetJob(jobID)
|
||||
if err != nil {
|
||||
// If not found in Redis, try OurDB for historical jobs
|
||||
job = &herojobs.Job{JobID: jobID}
|
||||
if err := job.Load(); err != nil {
|
||||
log.Printf("Failed to load job: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Check job status
|
||||
switch job.Status {
|
||||
case herojobs.JobStatusNew:
|
||||
fmt.Println("Job is waiting to be processed")
|
||||
case herojobs.JobStatusActive:
|
||||
fmt.Println("Job is currently being processed")
|
||||
case herojobs.JobStatusDone:
|
||||
fmt.Printf("Job completed successfully: %s\n", job.Result)
|
||||
case herojobs.JobStatusError:
|
||||
fmt.Printf("Job failed: %s\n", job.Error)
|
||||
}
|
||||
```
|
||||
|
||||
## Processing Implementation
|
||||
|
||||
The watchdog supports different types of job parameters processing:
|
||||
|
||||
### HeroScript Processing
|
||||
Jobs with ParamsType = ParamsTypeHeroScript are processed by passing the script to a HeroScript handler.
|
||||
|
||||
### RhaiScript Processing
|
||||
Jobs with ParamsType = ParamsTypeRhaiScript are processed by the Rhai scripting engine.
|
||||
|
||||
### OpenRPC Processing
|
||||
Jobs with ParamsType = ParamsTypeOpenRPC are processed by passing the parameters to an OpenRPC handler.
|
||||
|
||||
### AI Processing
|
||||
Jobs with ParamsType = ParamsTypeAI are processed by an AI system (experimental).
|
||||
|
||||
This modular design allows for extensible script and API call processing through appropriate handlers.
|
242
pkg/herojobs/job.go
Normal file
242
pkg/herojobs/job.go
Normal file
@@ -0,0 +1,242 @@
|
||||
package herojobs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/freeflowuniverse/heroagent/pkg/data/ourdb"
|
||||
"github.com/freeflowuniverse/heroagent/pkg/tools"
|
||||
)
|
||||
|
||||
// JobStatus represents the status of a job
|
||||
type JobStatus string
|
||||
|
||||
const (
|
||||
// JobStatusNew indicates a newly created job
|
||||
JobStatusNew JobStatus = "new"
|
||||
// JobStatusActive indicates a job that is currently being processed
|
||||
JobStatusActive JobStatus = "active"
|
||||
// JobStatusError indicates a job that encountered an error
|
||||
JobStatusError JobStatus = "error"
|
||||
// JobStatusDone indicates a job that has been completed successfully
|
||||
JobStatusDone JobStatus = "done"
|
||||
)
|
||||
|
||||
// ParamsType represents the type of parameters for a job
|
||||
type ParamsType string
|
||||
|
||||
const (
|
||||
// ParamsTypeHeroScript indicates parameters in HeroScript format
|
||||
ParamsTypeHeroScript ParamsType = "heroscript"
|
||||
// ParamsTypeRhaiScript indicates parameters in RhaiScript format
|
||||
ParamsTypeRhaiScript ParamsType = "rhaiscript"
|
||||
// ParamsTypeOpenRPC indicates parameters in OpenRPC format
|
||||
ParamsTypeOpenRPC ParamsType = "openrpc"
|
||||
ParamsTypeAI ParamsType = "ai"
|
||||
)
|
||||
|
||||
// Job represents a job to be processed
|
||||
type Job struct {
|
||||
JobID uint32 `json:"jobid"`
|
||||
SessionKey string `json:"sessionkey"`
|
||||
CircleID string `json:"circleid"`
|
||||
Topic string `json:"topic"`
|
||||
Params string `json:"params"` //can be a script, rpc, etc.
|
||||
ParamsType ParamsType `json:"params_type"` // Type of params: heroscript, rhaiscript, openrpc
|
||||
Timeout int64 `json:"timeout"`
|
||||
Status JobStatus `json:"status"`
|
||||
TimeScheduled int64 `json:"time_scheduled"`
|
||||
TimeStart int64 `json:"time_start"`
|
||||
TimeEnd int64 `json:"time_end"`
|
||||
Error string `json:"error"`
|
||||
Result string `json:"result"`
|
||||
Log bool `json:"log"` // Whether to enable logging for this job
|
||||
|
||||
// OurDB client for job storage
|
||||
dbClient *ourdb.Client
|
||||
}
|
||||
|
||||
// NewJob creates a new job with default values
|
||||
func NewJob() *Job {
|
||||
now := time.Now().Unix()
|
||||
return &Job{
|
||||
JobID: 0, // Default to 0, will be auto-incremented by OurDB
|
||||
Topic: "default",
|
||||
Status: JobStatusNew,
|
||||
ParamsType: ParamsTypeHeroScript, // Default to HeroScript
|
||||
TimeScheduled: now,
|
||||
}
|
||||
}
|
||||
|
||||
// NewJobFromJSON creates a new job from a JSON string
|
||||
func NewJobFromJSON(jsonStr string) (*Job, error) {
|
||||
job := &Job{}
|
||||
err := json.Unmarshal([]byte(jsonStr), job)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal job: %w", err)
|
||||
}
|
||||
|
||||
// Set default values if not provided
|
||||
if job.JobID == 0 {
|
||||
job.JobID = 0 // Default to 0, will be auto-incremented by OurDB
|
||||
}
|
||||
if job.Topic == "" {
|
||||
job.Topic = "default"
|
||||
}
|
||||
if job.Status == "" {
|
||||
job.Status = JobStatusNew
|
||||
}
|
||||
if job.ParamsType == "" {
|
||||
job.ParamsType = ParamsTypeOpenRPC
|
||||
}
|
||||
if job.TimeScheduled == 0 {
|
||||
job.TimeScheduled = time.Now().Unix()
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// ToJSON converts the job to a JSON string
|
||||
func (j *Job) ToJSON() (string, error) {
|
||||
bytes, err := json.Marshal(j)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal job: %w", err)
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
|
||||
// QueueKey returns the Redis queue key for this job
|
||||
func (j *Job) QueueKey() string {
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(j.CircleID)
|
||||
fixedTopic := tools.NameFix(j.Topic)
|
||||
return fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
|
||||
}
|
||||
|
||||
// StorageKey returns the Redis storage key for this job
|
||||
func (j *Job) StorageKey() string {
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(j.CircleID)
|
||||
fixedTopic := tools.NameFix(j.Topic)
|
||||
return fmt.Sprintf("herojobs:%s:%s:%d", fixedCircleID, fixedTopic, j.JobID)
|
||||
}
|
||||
|
||||
// initOurDbClient initializes the OurDB client connection for the job
|
||||
func (j *Job) initOurDbClient() error {
|
||||
if j.dbClient != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get user home directory: %w", err)
|
||||
}
|
||||
|
||||
// Create path as ~/hero/var/circles/$circleid/$topic/jobsdb
|
||||
dbPath := filepath.Join(homeDir, "hero", "var", "circles", j.CircleID, j.Topic, "jobsdb")
|
||||
|
||||
// Ensure directory exists
|
||||
if err := os.MkdirAll(dbPath, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create database directory: %w", err)
|
||||
}
|
||||
|
||||
// Initialize OurDB client
|
||||
client, err := ourdb.NewClient(dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OurDB client: %w", err)
|
||||
}
|
||||
|
||||
j.dbClient = client
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save stores the job in OurDB using auto-incrementing JobID
|
||||
func (j *Job) Save() error {
|
||||
// Ensure OurDB client is initialized
|
||||
if err := j.initOurDbClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Convert job to JSON
|
||||
jobData, err := json.Marshal(j)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal job to JSON: %w", err)
|
||||
}
|
||||
|
||||
// If JobID is 0, let OurDB assign an auto-incremented ID
|
||||
if j.JobID == 0 {
|
||||
// Use OurDB Add method to auto-generate ID
|
||||
id, err := j.dbClient.Add(jobData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add job to OurDB: %w", err)
|
||||
}
|
||||
j.JobID = id
|
||||
} else {
|
||||
// Save to OurDB with specified ID
|
||||
if err := j.dbClient.Set(j.JobID, jobData); err != nil {
|
||||
return fmt.Errorf("failed to save job to OurDB: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load retrieves the job from OurDB using the JobID as the key
|
||||
func (j *Job) Load() error {
|
||||
// Ensure OurDB client is initialized
|
||||
if err := j.initOurDbClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate that JobID is not zero
|
||||
if j.JobID == 0 {
|
||||
return fmt.Errorf("cannot load job with ID 0: no ID specified")
|
||||
}
|
||||
|
||||
// Get from OurDB using the numeric JobID directly
|
||||
jobData, err := j.dbClient.Get(j.JobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load job from OurDB: %w", err)
|
||||
}
|
||||
|
||||
// Parse job data
|
||||
tempJob := &Job{}
|
||||
if err := json.Unmarshal(jobData, tempJob); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal job data: %w", err)
|
||||
}
|
||||
|
||||
// Copy values to current job (except the dbClient)
|
||||
dbClient := j.dbClient
|
||||
*j = *tempJob
|
||||
j.dbClient = dbClient
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Finish completes the job, updating all properties and saving to OurDB
|
||||
func (j *Job) Finish(status JobStatus, result string, err error) error {
|
||||
// Update job properties
|
||||
j.Status = status
|
||||
j.TimeEnd = time.Now().Unix()
|
||||
j.Result = result
|
||||
|
||||
if err != nil {
|
||||
j.Error = err.Error()
|
||||
}
|
||||
|
||||
// Save updated job to OurDB
|
||||
return j.Save()
|
||||
}
|
||||
|
||||
// Close closes the OurDB client connection
|
||||
func (j *Job) Close() error {
|
||||
if j.dbClient != nil {
|
||||
err := j.dbClient.Close()
|
||||
j.dbClient = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
148
pkg/herojobs/processjob.go
Normal file
148
pkg/herojobs/processjob.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package herojobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// processJob processes a job
|
||||
func (d *WatchDog) processJob(job *Job) {
|
||||
// First, initialize OurDB connection and save the job
|
||||
if err := job.initOurDbClient(); err != nil {
|
||||
log.Printf("Error initializing OurDB client: %v", err)
|
||||
} else {
|
||||
// Set job status to active and save it to OurDB
|
||||
job.Status = JobStatusActive
|
||||
job.TimeStart = time.Now().Unix()
|
||||
|
||||
if err := job.Save(); err != nil {
|
||||
log.Printf("Error saving job to OurDB: %v", err)
|
||||
}
|
||||
|
||||
// Make sure to close the DB connection when we're done
|
||||
defer job.Close()
|
||||
}
|
||||
|
||||
// Also update Redis for backward compatibility
|
||||
err := d.redisClient.UpdateJobStatus(job.JobID, JobStatusActive)
|
||||
if err != nil {
|
||||
log.Printf("Error updating job status in Redis: %v", err)
|
||||
}
|
||||
|
||||
// Create a context with timeout
|
||||
timeout := time.Duration(job.Timeout) * time.Second
|
||||
if timeout == 0 {
|
||||
timeout = 60 * time.Second // Default timeout: 60 seconds
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(d.ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// Process job based on parameters type
|
||||
switch job.ParamsType {
|
||||
case ParamsTypeHeroScript:
|
||||
d.processHeroScript(ctx, job)
|
||||
case ParamsTypeRhaiScript:
|
||||
d.processRhaiScript(ctx, job)
|
||||
case ParamsTypeOpenRPC:
|
||||
d.processOpenRPC(ctx, job)
|
||||
case ParamsTypeAI:
|
||||
d.processAI(ctx, job)
|
||||
default:
|
||||
// Unknown parameters type
|
||||
errMsg := fmt.Sprintf("Unknown parameters type: %s", job.ParamsType)
|
||||
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
|
||||
|
||||
// Update job status to error in both OurDB and Redis
|
||||
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
||||
|
||||
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
|
||||
log.Printf("Error updating job error in Redis: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processHeroScript processes a job with HeroScript parameters
|
||||
func (d *WatchDog) processHeroScript(ctx context.Context, job *Job) {
|
||||
// Start a goroutine to process the job
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
|
||||
// TODO: Implement HeroScript processing
|
||||
errMsg := "HeroScript processing not implemented yet"
|
||||
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
|
||||
|
||||
// Update job status in OurDB
|
||||
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
||||
|
||||
// Also update Redis for backward compatibility
|
||||
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
|
||||
log.Printf("Error updating job error in Redis: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// processRhaiScript processes a job with RhaiScript parameters
|
||||
func (d *WatchDog) processRhaiScript(ctx context.Context, job *Job) {
|
||||
// Start a goroutine to process the job
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
|
||||
// TODO: Implement RhaiScript processing
|
||||
errMsg := "RhaiScript processing not implemented yet"
|
||||
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
|
||||
|
||||
// Update job status in OurDB
|
||||
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
||||
|
||||
// Also update Redis for backward compatibility
|
||||
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
|
||||
log.Printf("Error updating job error in Redis: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// processOpenRPC processes a job with OpenRPC parameters
|
||||
func (d *WatchDog) processOpenRPC(ctx context.Context, job *Job) {
|
||||
// Start a goroutine to process the job
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
|
||||
// TODO: Implement OpenRPC processing
|
||||
errMsg := "OpenRPC processing not implemented yet"
|
||||
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
|
||||
|
||||
// Update job status in OurDB
|
||||
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
||||
|
||||
// Also update Redis for backward compatibility
|
||||
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
|
||||
log.Printf("Error updating job error in Redis: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// processAI processes a job with AI parameters
|
||||
func (d *WatchDog) processAI(ctx context.Context, job *Job) {
|
||||
// Start a goroutine to process the job
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
|
||||
// TODO: Implement AI processing
|
||||
errMsg := "AI processing not implemented yet"
|
||||
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
|
||||
|
||||
// Update job status in OurDB
|
||||
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
||||
|
||||
// Also update Redis for backward compatibility
|
||||
if err := d.redisClient.UpdateJobError(job.JobID, errMsg); err != nil {
|
||||
log.Printf("Error updating job error in Redis: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
402
pkg/herojobs/redis.go
Normal file
402
pkg/herojobs/redis.go
Normal file
@@ -0,0 +1,402 @@
|
||||
package herojobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/freeflowuniverse/heroagent/pkg/tools"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// RedisClient handles Redis operations for jobs
|
||||
type RedisClient struct {
|
||||
client *redis.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewRedisClient creates a new Redis client
|
||||
func NewRedisClient(addr string, isUnixSocket bool) (*RedisClient, error) {
|
||||
// Determine network type
|
||||
networkType := "tcp"
|
||||
if isUnixSocket {
|
||||
networkType = "unix"
|
||||
}
|
||||
|
||||
// Create Redis client
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Network: networkType,
|
||||
Addr: addr,
|
||||
DB: 0,
|
||||
DialTimeout: 5 * time.Second,
|
||||
ReadTimeout: 5 * time.Second,
|
||||
WriteTimeout: 5 * time.Second,
|
||||
})
|
||||
|
||||
// Test connection
|
||||
ctx := context.Background()
|
||||
_, err := client.Ping(ctx).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
||||
}
|
||||
|
||||
return &RedisClient{
|
||||
client: client,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the Redis client
|
||||
func (r *RedisClient) Close() error {
|
||||
return r.client.Close()
|
||||
}
|
||||
|
||||
// StoreJob stores a job in Redis
|
||||
func (r *RedisClient) StoreJob(job *Job) error {
|
||||
// Convert job to JSON
|
||||
jobJSON, err := job.ToJSON()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store job in Redis
|
||||
err = r.client.HSet(r.ctx, job.StorageKey(), "job", jobJSON).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store job: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetJob retrieves a job from Redis by ID
|
||||
func (r *RedisClient) GetJob(jobID interface{}) (*Job, error) {
|
||||
// For interface{} type, we need more information to construct the storage key
|
||||
// Two approaches:
|
||||
// 1. For numeric JobID only (no circle or topic), use legacy format
|
||||
// 2. For full key with pattern "circleID:topic:jobID", parse out components
|
||||
|
||||
var storageKey string
|
||||
|
||||
// Handle different types of jobID
|
||||
switch id := jobID.(type) {
|
||||
case uint32:
|
||||
// Legacy format for backward compatibility
|
||||
storageKey = fmt.Sprintf("herojobs:%d", id)
|
||||
case string:
|
||||
// Check if this is a composite key (circleID:topic:jobID)
|
||||
parts := strings.Split(id, ":")
|
||||
if len(parts) == 3 {
|
||||
// This is a composite key
|
||||
circleID := tools.NameFix(parts[0])
|
||||
topic := tools.NameFix(parts[1])
|
||||
|
||||
// Try to convert last part to uint32
|
||||
var numericID uint32
|
||||
if _, err := fmt.Sscanf(parts[2], "%d", &numericID); err == nil {
|
||||
storageKey = fmt.Sprintf("herojobs:%s:%s:%d", circleID, topic, numericID)
|
||||
} else {
|
||||
// Legacy string ID format in composite key
|
||||
storageKey = fmt.Sprintf("herojobs:%s:%s:%s", circleID, topic, parts[2])
|
||||
}
|
||||
} else {
|
||||
// Try to convert string to uint32 (legacy format)
|
||||
var numericID uint32
|
||||
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
|
||||
storageKey = fmt.Sprintf("herojobs:%d", numericID)
|
||||
} else {
|
||||
// Legacy string ID format
|
||||
storageKey = fmt.Sprintf("herojobs:%s", id)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported job ID type: %T", jobID)
|
||||
}
|
||||
|
||||
// Get job from Redis
|
||||
jobJSON, err := r.client.HGet(r.ctx, storageKey, "job").Result()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, fmt.Errorf("job not found: %v", jobID)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get job: %w", err)
|
||||
}
|
||||
|
||||
// Parse job JSON
|
||||
job, err := NewJobFromJSON(jobJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// DeleteJob deletes a job from Redis
|
||||
func (r *RedisClient) DeleteJob(jobID interface{}) error {
|
||||
// Handle different types of jobID similar to GetJob
|
||||
var storageKey string
|
||||
|
||||
switch id := jobID.(type) {
|
||||
case uint32:
|
||||
// Legacy format for backward compatibility
|
||||
storageKey = fmt.Sprintf("herojobs:%d", id)
|
||||
case string:
|
||||
// Check if this is a composite key (circleID:topic:jobID)
|
||||
parts := strings.Split(id, ":")
|
||||
if len(parts) == 3 {
|
||||
// This is a composite key
|
||||
circleID := tools.NameFix(parts[0])
|
||||
topic := tools.NameFix(parts[1])
|
||||
|
||||
// Try to convert last part to uint32
|
||||
var numericID uint32
|
||||
if _, err := fmt.Sscanf(parts[2], "%d", &numericID); err == nil {
|
||||
storageKey = fmt.Sprintf("herojobs:%s:%s:%d", circleID, topic, numericID)
|
||||
} else {
|
||||
// Legacy string ID format in composite key
|
||||
storageKey = fmt.Sprintf("herojobs:%s:%s:%s", circleID, topic, parts[2])
|
||||
}
|
||||
} else {
|
||||
// Try to convert string to uint32 (legacy format)
|
||||
var numericID uint32
|
||||
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
|
||||
storageKey = fmt.Sprintf("herojobs:%d", numericID)
|
||||
} else {
|
||||
// Legacy string ID format
|
||||
storageKey = fmt.Sprintf("herojobs:%s", id)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unsupported job ID type: %T", jobID)
|
||||
}
|
||||
|
||||
// Delete job from Redis
|
||||
err := r.client.Del(r.ctx, storageKey).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete job: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnqueueJob adds a job to its queue
|
||||
func (r *RedisClient) EnqueueJob(job *Job) error {
|
||||
// Store the job first
|
||||
err := r.StoreJob(job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add job ID to queue
|
||||
err = r.client.RPush(r.ctx, job.QueueKey(), job.JobID).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to enqueue job: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueSize returns the size of a queue
|
||||
func (r *RedisClient) QueueSize(circleID, topic string) (int64, error) {
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(circleID)
|
||||
fixedTopic := tools.NameFix(topic)
|
||||
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
|
||||
|
||||
// Get queue size
|
||||
size, err := r.client.LLen(r.ctx, queueKey).Result()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get queue size: %w", err)
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
// QueueEmpty empties a queue and deletes all corresponding jobs
|
||||
func (r *RedisClient) QueueEmpty(circleID, topic string) error {
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(circleID)
|
||||
fixedTopic := tools.NameFix(topic)
|
||||
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
|
||||
|
||||
// Get all job IDs from queue
|
||||
jobIDs, err := r.client.LRange(r.ctx, queueKey, 0, -1).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get job IDs from queue: %w", err)
|
||||
}
|
||||
|
||||
// Delete all jobs
|
||||
for _, jobIDStr := range jobIDs {
|
||||
// Convert string ID to uint32 if possible
|
||||
var jobID uint32
|
||||
if _, err := fmt.Sscanf(jobIDStr, "%d", &jobID); err == nil {
|
||||
// New format with CircleID and Topic
|
||||
storageKey := fmt.Sprintf("herojobs:%s:%s:%d", fixedCircleID, fixedTopic, jobID)
|
||||
err := r.client.Del(r.ctx, storageKey).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete job %d: %w", jobID, err)
|
||||
}
|
||||
} else {
|
||||
// Handle legacy string IDs
|
||||
storageKey := fmt.Sprintf("herojobs:%s", jobIDStr)
|
||||
err := r.client.Del(r.ctx, storageKey).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete job %s: %w", jobIDStr, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Empty queue
|
||||
err = r.client.Del(r.ctx, queueKey).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to empty queue: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueGet gets the last job from a queue without removing it
|
||||
func (r *RedisClient) QueueGet(circleID, topic string) (*Job, error) {
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(circleID)
|
||||
fixedTopic := tools.NameFix(topic)
|
||||
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
|
||||
|
||||
// Get last job ID from queue
|
||||
jobID, err := r.client.LIndex(r.ctx, queueKey, -1).Result()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, fmt.Errorf("queue is empty")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get job ID from queue: %w", err)
|
||||
}
|
||||
|
||||
// Get job with context about circle and topic
|
||||
compositeID := fmt.Sprintf("%s:%s:%s", circleID, topic, jobID)
|
||||
return r.GetJob(compositeID)
|
||||
}
|
||||
|
||||
// QueueFetch gets and removes the last job from a queue
|
||||
func (r *RedisClient) QueueFetch(circleID, topic string) (*Job, error) {
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(circleID)
|
||||
fixedTopic := tools.NameFix(topic)
|
||||
queueKey := fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
|
||||
|
||||
// Get and remove last job ID from queue
|
||||
jobID, err := r.client.RPop(r.ctx, queueKey).Result()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, fmt.Errorf("queue is empty")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to fetch job ID from queue: %w", err)
|
||||
}
|
||||
|
||||
// Get job with context about circle and topic
|
||||
compositeID := fmt.Sprintf("%s:%s:%s", circleID, topic, jobID)
|
||||
return r.GetJob(compositeID)
|
||||
}
|
||||
|
||||
// ListJobs lists job IDs by circle and topic
|
||||
func (r *RedisClient) ListJobs(circleID, topic string) ([]uint32, error) {
|
||||
var pattern string
|
||||
|
||||
// Apply name fixing to CircleID and Topic
|
||||
fixedCircleID := tools.NameFix(circleID)
|
||||
fixedTopic := tools.NameFix(topic)
|
||||
|
||||
if circleID != "" && topic != "" {
|
||||
pattern = fmt.Sprintf("heroqueue:%s:%s", fixedCircleID, fixedTopic)
|
||||
} else if circleID != "" {
|
||||
pattern = fmt.Sprintf("heroqueue:%s:*", fixedCircleID)
|
||||
} else if topic != "" {
|
||||
pattern = fmt.Sprintf("heroqueue:*:%s", fixedTopic)
|
||||
} else {
|
||||
pattern = "heroqueue:*:*"
|
||||
}
|
||||
|
||||
// Get all matching queue keys
|
||||
queueKeys, err := r.client.Keys(r.ctx, pattern).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list queues: %w", err)
|
||||
}
|
||||
|
||||
var jobIDs []uint32
|
||||
for _, queueKey := range queueKeys {
|
||||
// Get all job IDs from queue
|
||||
stringIDs, err := r.client.LRange(r.ctx, queueKey, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get job IDs from queue %s: %w", queueKey, err)
|
||||
}
|
||||
|
||||
// Convert string IDs to uint32
|
||||
for _, idStr := range stringIDs {
|
||||
var id uint32
|
||||
if _, err := fmt.Sscanf(idStr, "%d", &id); err == nil {
|
||||
jobIDs = append(jobIDs, id)
|
||||
} else {
|
||||
// Log but continue - this handles legacy string IDs that can't be converted
|
||||
fmt.Printf("Warning: Found job ID that couldn't be converted to uint32: %s\n", idStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return jobIDs, nil
|
||||
}
|
||||
|
||||
// UpdateJobStatus updates the status of a job
|
||||
func (r *RedisClient) UpdateJobStatus(jobID uint32, status JobStatus) error {
|
||||
// Get job
|
||||
job, err := r.GetJob(jobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update status
|
||||
job.Status = status
|
||||
|
||||
// Update timestamps based on status
|
||||
now := time.Now().Unix()
|
||||
if status == JobStatusActive && job.TimeStart == 0 {
|
||||
job.TimeStart = now
|
||||
} else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 {
|
||||
job.TimeEnd = now
|
||||
}
|
||||
|
||||
// Store updated job
|
||||
return r.StoreJob(job)
|
||||
}
|
||||
|
||||
// UpdateJobResult updates the result of a job
|
||||
func (r *RedisClient) UpdateJobResult(jobID uint32, result string) error {
|
||||
// Get job
|
||||
job, err := r.GetJob(jobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update result
|
||||
job.Result = result
|
||||
job.Status = JobStatusDone
|
||||
job.TimeEnd = time.Now().Unix()
|
||||
|
||||
// Store updated job
|
||||
return r.StoreJob(job)
|
||||
}
|
||||
|
||||
// UpdateJobError updates the error of a job
|
||||
func (r *RedisClient) UpdateJobError(jobID uint32, errorMsg string) error {
|
||||
// Get job
|
||||
job, err := r.GetJob(jobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update error
|
||||
job.Error = errorMsg
|
||||
job.Status = JobStatusError
|
||||
job.TimeEnd = time.Now().Unix()
|
||||
|
||||
// Store updated job
|
||||
return r.StoreJob(job)
|
||||
}
|
115
pkg/herojobs/watchdog.go
Normal file
115
pkg/herojobs/watchdog.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package herojobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// JobProcessor is a function type that processes a job
|
||||
// WatchDog handles the processing of jobs from queues
|
||||
type WatchDog struct {
|
||||
redisClient *RedisClient
|
||||
processorMutex sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewWatchDog creates a new watchdog
|
||||
func NewWatchDog(redisClient *RedisClient) *WatchDog {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &WatchDog{
|
||||
redisClient: redisClient,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the watchdog
|
||||
func (d *WatchDog) Start() {
|
||||
d.wg.Add(1)
|
||||
go d.processQueues()
|
||||
}
|
||||
|
||||
// Stop stops the watchdog
|
||||
func (d *WatchDog) Stop() {
|
||||
d.cancel()
|
||||
d.wg.Wait()
|
||||
}
|
||||
|
||||
// processQueues processes all queues
|
||||
func (d *WatchDog) processQueues() {
|
||||
defer d.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Get all queues
|
||||
queues, err := d.getQueues()
|
||||
if err != nil {
|
||||
log.Printf("Error getting queues: %v", err)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
// Process each queue
|
||||
processed := false
|
||||
for _, queue := range queues {
|
||||
circleID, topic := queue.circleID, queue.topic
|
||||
job, err := d.redisClient.QueueFetch(circleID, topic)
|
||||
if err != nil {
|
||||
// Queue is empty, continue to next queue
|
||||
continue
|
||||
}
|
||||
|
||||
// Process the job
|
||||
processed = true
|
||||
d.processJob(job)
|
||||
}
|
||||
|
||||
// If no jobs were processed, wait before checking again
|
||||
if !processed {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queueInfo represents a queue
|
||||
type queueInfo struct {
|
||||
circleID string
|
||||
topic string
|
||||
}
|
||||
|
||||
// getQueues returns all queues
|
||||
func (d *WatchDog) getQueues() ([]queueInfo, error) {
|
||||
// Get all queue keys from Redis
|
||||
queueKeys, err := d.redisClient.client.Keys(d.redisClient.ctx, "heroqueue:*:*").Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list queues: %w", err)
|
||||
}
|
||||
|
||||
var queues []queueInfo
|
||||
for _, queueKey := range queueKeys {
|
||||
// Parse queue key (format: heroqueue:<circleID>:<topic>)
|
||||
// Split the key by ":"
|
||||
parts := strings.Split(queueKey, ":")
|
||||
if len(parts) != 3 {
|
||||
log.Printf("Invalid queue key format: %s", queueKey)
|
||||
continue
|
||||
}
|
||||
|
||||
queues = append(queues, queueInfo{
|
||||
circleID: parts[1],
|
||||
topic: parts[2],
|
||||
})
|
||||
}
|
||||
|
||||
return queues, nil
|
||||
}
|
Reference in New Issue
Block a user