...
This commit is contained in:
parent
5d241e9ade
commit
e60b9f62f1
@ -19,8 +19,8 @@ func main() {
|
|||||||
|
|
||||||
// Customize configuration if needed
|
// Customize configuration if needed
|
||||||
config.Redis.TCPPort = 6379
|
config.Redis.TCPPort = 6379
|
||||||
config.Redis.UnixSocketPath = "/tmp/redis.sock"
|
config.Redis.UnixSocketPath = "" // Use TCP connection only
|
||||||
config.Jobs.OurDBPath = "./data/jobsdb"
|
config.Jobs.OurDBPath = "/tmp/jobsdb"
|
||||||
config.Jobs.WorkerCount = 3
|
config.Jobs.WorkerCount = 3
|
||||||
config.Jobs.QueuePollInterval = 200 * time.Millisecond
|
config.Jobs.QueuePollInterval = 200 * time.Millisecond
|
||||||
|
|
||||||
|
@ -22,11 +22,14 @@ The job system follows a specific flow:
|
|||||||
1. **Job Creation**:
|
1. **Job Creation**:
|
||||||
- When a job is created, it's stored in both OurDB and Redis
|
- When a job is created, it's stored in both OurDB and Redis
|
||||||
- OurDB provides persistent storage with history tracking
|
- OurDB provides persistent storage with history tracking
|
||||||
- Redis provides fast access and queuing capabilities
|
- Redis stores the job data and adds the job ID to a queue for processing
|
||||||
|
- Each job is stored in Redis using a key pattern: `herojobs:<topic>:<jobID>`
|
||||||
|
- Each job ID is added to a queue using a key pattern: `heroqueue:<topic>`
|
||||||
|
|
||||||
2. **Job Processing**:
|
2. **Job Processing**:
|
||||||
- Workers continuously poll Redis queues for new jobs
|
- Workers continuously poll Redis queues for new jobs
|
||||||
- When a job is found, it's updated to "active" status in both OurDB and Redis
|
- When a job is found, it's fetched from Redis and updated to "active" status
|
||||||
|
- The updated job is stored in both OurDB and Redis
|
||||||
- The job is processed based on its parameters
|
- The job is processed based on its parameters
|
||||||
|
|
||||||
3. **Job Completion**:
|
3. **Job Completion**:
|
||||||
@ -34,6 +37,60 @@ The job system follows a specific flow:
|
|||||||
- The job is removed from Redis to keep only active jobs in memory
|
- The job is removed from Redis to keep only active jobs in memory
|
||||||
- This approach ensures efficient memory usage while maintaining a complete history
|
- This approach ensures efficient memory usage while maintaining a complete history
|
||||||
|
|
||||||
|
### Data Flow Diagram
|
||||||
|
|
||||||
|
```
|
||||||
|
Job Creation:
|
||||||
|
┌─────────┐ ┌─────────┐ ┌─────────┐
|
||||||
|
│ Client │────▶│ OurDB │ │ Redis │
|
||||||
|
└─────────┘ └────┬────┘ └────┬────┘
|
||||||
|
│ │
|
||||||
|
│ Store Job │ Store Job
|
||||||
|
│ │
|
||||||
|
▼ ▼
|
||||||
|
┌─────────┐ ┌─────────┐
|
||||||
|
│ Job Data│ │ Job Data│
|
||||||
|
└─────────┘ └─────────┘
|
||||||
|
│
|
||||||
|
│ Add to Queue
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────┐
|
||||||
|
│ Queue │
|
||||||
|
└─────────┘
|
||||||
|
|
||||||
|
Job Processing:
|
||||||
|
┌─────────┐ ┌─────────┐ ┌─────────┐
|
||||||
|
│ Worker │────▶│ Redis │────▶│ OurDB │
|
||||||
|
└─────────┘ └────┬────┘ └────┬────┘
|
||||||
|
│ │
|
||||||
|
│ Fetch Job │ Update Job
|
||||||
|
│ from Queue │
|
||||||
|
▼ ▼
|
||||||
|
┌─────────┐ ┌─────────┐
|
||||||
|
│ Job Data│ │ Job Data│
|
||||||
|
└─────────┘ └─────────┘
|
||||||
|
│
|
||||||
|
│ Process Job
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────┐
|
||||||
|
│ Result │
|
||||||
|
└─────────┘
|
||||||
|
|
||||||
|
Job Completion:
|
||||||
|
┌─────────┐ ┌─────────┐ ┌─────────┐
|
||||||
|
│ Worker │────▶│ OurDB │ │ Redis │
|
||||||
|
└─────────┘ └────┬────┘ └────┬────┘
|
||||||
|
│ │
|
||||||
|
│ Update Job │ Remove Job
|
||||||
|
│ │
|
||||||
|
▼ ▼
|
||||||
|
┌─────────┐ ┌─────────┐
|
||||||
|
│ Job Data│ │ ✓ │
|
||||||
|
└─────────┘ └─────────┘
|
||||||
|
```
|
||||||
|
|
||||||
### Components
|
### Components
|
||||||
|
|
||||||
- **JobManager**: Coordinates job operations between OurDB and Redis
|
- **JobManager**: Coordinates job operations between OurDB and Redis
|
||||||
|
@ -62,8 +62,8 @@ type JobsConfig struct {
|
|||||||
func DefaultConfig() Config {
|
func DefaultConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
Redis: RedisConfig{
|
Redis: RedisConfig{
|
||||||
TCPPort: 6378,
|
TCPPort: 6379,
|
||||||
UnixSocketPath: "/tmp/redis.sock",
|
UnixSocketPath: "", // Empty string means use TCP only
|
||||||
},
|
},
|
||||||
WebDAV: WebDAVConfig{
|
WebDAV: WebDAVConfig{
|
||||||
Config: webdavserver.DefaultConfig(),
|
Config: webdavserver.DefaultConfig(),
|
||||||
|
351
pkg/servers/heroagent/jobmanager.go
Normal file
351
pkg/servers/heroagent/jobmanager.go
Normal file
@ -0,0 +1,351 @@
|
|||||||
|
package heroagent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.ourworld.tf/herocode/heroagent/pkg/data/ourdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JobManager handles job management between OurDB and Redis
|
||||||
|
type JobManager struct {
|
||||||
|
config JobsConfig
|
||||||
|
ourDB *ourdb.OurDB
|
||||||
|
redisConn *RedisConnection
|
||||||
|
redisMgr *RedisJobManager
|
||||||
|
workers []*JobWorker
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJobManager creates a new job manager
|
||||||
|
func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) {
|
||||||
|
// Create OurDB directory if it doesn't exist
|
||||||
|
if err := os.MkdirAll(config.OurDBPath, 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create OurDB directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize OurDB
|
||||||
|
ourDBConfig := ourdb.DefaultConfig()
|
||||||
|
ourDBConfig.Path = config.OurDBPath
|
||||||
|
ourDBConfig.IncrementalMode = true
|
||||||
|
|
||||||
|
db, err := ourdb.New(ourDBConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize Redis job manager
|
||||||
|
redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath)
|
||||||
|
if err != nil {
|
||||||
|
// Close OurDB before returning error
|
||||||
|
if closeErr := db.Close(); closeErr != nil {
|
||||||
|
log.Printf("Warning: failed to close OurDB: %v", closeErr)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("failed to create Redis job manager: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create context with cancel
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// Create job manager
|
||||||
|
jobMgr := &JobManager{
|
||||||
|
config: config,
|
||||||
|
ourDB: db,
|
||||||
|
redisConn: redisConn,
|
||||||
|
redisMgr: redisMgr,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobMgr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the job manager
|
||||||
|
func (jm *JobManager) Start() error {
|
||||||
|
log.Println("Starting job manager...")
|
||||||
|
|
||||||
|
// Start workers
|
||||||
|
jm.workers = make([]*JobWorker, jm.config.WorkerCount)
|
||||||
|
for i := 0; i < jm.config.WorkerCount; i++ {
|
||||||
|
worker := &JobWorker{
|
||||||
|
id: i,
|
||||||
|
jobMgr: jm,
|
||||||
|
ctx: jm.ctx,
|
||||||
|
wg: &jm.wg,
|
||||||
|
}
|
||||||
|
jm.workers[i] = worker
|
||||||
|
jm.startWorker(worker)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Job manager started with %d workers", jm.config.WorkerCount)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the job manager
|
||||||
|
func (jm *JobManager) Stop() error {
|
||||||
|
log.Println("Stopping job manager...")
|
||||||
|
|
||||||
|
// Signal all workers to stop
|
||||||
|
jm.cancel()
|
||||||
|
|
||||||
|
// Wait for all workers to finish
|
||||||
|
jm.wg.Wait()
|
||||||
|
|
||||||
|
// Close Redis job manager
|
||||||
|
if jm.redisMgr != nil {
|
||||||
|
if err := jm.redisMgr.Close(); err != nil {
|
||||||
|
log.Printf("Warning: failed to close Redis job manager: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close OurDB
|
||||||
|
if err := jm.ourDB.Close(); err != nil {
|
||||||
|
// Log the error but don't fail the shutdown
|
||||||
|
log.Printf("Warning: failed to close OurDB: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Job manager stopped")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// startWorker starts a worker
|
||||||
|
func (jm *JobManager) startWorker(worker *JobWorker) {
|
||||||
|
jm.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer jm.wg.Done()
|
||||||
|
worker.run()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateJob creates a new job
|
||||||
|
func (jm *JobManager) CreateJob(topic, params string) (*Job, error) {
|
||||||
|
// Create new job
|
||||||
|
job := &Job{
|
||||||
|
Topic: topic,
|
||||||
|
Params: params,
|
||||||
|
Status: JobStatusNew,
|
||||||
|
TimeScheduled: time.Now().Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store job in OurDB
|
||||||
|
jobData, err := json.Marshal(job)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add job to OurDB with auto-incremented ID
|
||||||
|
id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{
|
||||||
|
Data: jobData,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to store job in OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job with assigned ID
|
||||||
|
job.JobID = id
|
||||||
|
|
||||||
|
// Store job in Redis and add to queue
|
||||||
|
if err := jm.redisMgr.EnqueueJob(job); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to store job in Redis: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID)
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJob retrieves a job by ID
|
||||||
|
func (jm *JobManager) GetJob(jobID uint32) (*Job, error) {
|
||||||
|
// Get job from OurDB
|
||||||
|
jobData, err := jm.ourDB.Get(jobID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get job from OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse job data
|
||||||
|
job := &Job{}
|
||||||
|
if err := json.Unmarshal(jobData, job); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal job data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateJobStatus updates the status of a job
|
||||||
|
func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error {
|
||||||
|
// Get job from OurDB
|
||||||
|
job, err := jm.GetJob(jobID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update status
|
||||||
|
job.Status = status
|
||||||
|
|
||||||
|
// Update timestamps based on status
|
||||||
|
now := time.Now().Unix()
|
||||||
|
if status == JobStatusActive && job.TimeStart == 0 {
|
||||||
|
job.TimeStart = now
|
||||||
|
} else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 {
|
||||||
|
job.TimeEnd = now
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store updated job in OurDB
|
||||||
|
jobData, err := json.Marshal(job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job in OurDB
|
||||||
|
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
||||||
|
ID: &jobID,
|
||||||
|
Data: jobData,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If job is done or has error, remove from Redis
|
||||||
|
if status == JobStatusDone || status == JobStatusError {
|
||||||
|
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
|
||||||
|
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Otherwise, update in Redis
|
||||||
|
if err := jm.redisMgr.UpdateJobStatus(job); err != nil {
|
||||||
|
log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompleteJob marks a job as completed
|
||||||
|
func (jm *JobManager) CompleteJob(jobID uint32, result string) error {
|
||||||
|
// Get job from OurDB
|
||||||
|
job, err := jm.GetJob(jobID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job
|
||||||
|
job.Status = JobStatusDone
|
||||||
|
job.TimeEnd = time.Now().Unix()
|
||||||
|
job.Result = result
|
||||||
|
|
||||||
|
// Store updated job in OurDB
|
||||||
|
jobData, err := json.Marshal(job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job in OurDB
|
||||||
|
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
||||||
|
ID: &jobID,
|
||||||
|
Data: jobData,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from Redis
|
||||||
|
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
|
||||||
|
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Job %d completed and removed from Redis", jobID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FailJob marks a job as failed
|
||||||
|
func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error {
|
||||||
|
// Get job from OurDB
|
||||||
|
job, err := jm.GetJob(jobID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job
|
||||||
|
job.Status = JobStatusError
|
||||||
|
job.TimeEnd = time.Now().Unix()
|
||||||
|
job.Error = errorMsg
|
||||||
|
|
||||||
|
// Store updated job in OurDB
|
||||||
|
jobData, err := json.Marshal(job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job in OurDB
|
||||||
|
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
||||||
|
ID: &jobID,
|
||||||
|
Data: jobData,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from Redis
|
||||||
|
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
|
||||||
|
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Job %d failed and removed from Redis", jobID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateJobInBothStores updates a job in both OurDB and Redis
|
||||||
|
func (jm *JobManager) updateJobInBothStores(job *Job) error {
|
||||||
|
// Store job in OurDB
|
||||||
|
jobData, err := json.Marshal(job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job in OurDB
|
||||||
|
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
||||||
|
ID: &job.JobID,
|
||||||
|
Data: jobData,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job in Redis
|
||||||
|
if err := jm.redisMgr.UpdateJobStatus(job); err != nil {
|
||||||
|
return fmt.Errorf("failed to update job in Redis: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// completeJobProcessing updates a completed job in OurDB and removes it from Redis
|
||||||
|
func (jm *JobManager) completeJobProcessing(job *Job) error {
|
||||||
|
// Store job in OurDB
|
||||||
|
jobData, err := json.Marshal(job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update job in OurDB
|
||||||
|
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
||||||
|
ID: &job.JobID,
|
||||||
|
Data: jobData,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from Redis
|
||||||
|
if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil {
|
||||||
|
return fmt.Errorf("failed to remove job from Redis: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -2,14 +2,10 @@ package heroagent
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.ourworld.tf/herocode/heroagent/pkg/data/ourdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// JobStatus represents the status of a job
|
// JobStatus represents the status of a job
|
||||||
@ -39,18 +35,6 @@ type Job struct {
|
|||||||
Result string `json:"result"`
|
Result string `json:"result"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// JobManager handles job management between OurDB and Redis
|
|
||||||
type JobManager struct {
|
|
||||||
config JobsConfig
|
|
||||||
ourDB *ourdb.OurDB
|
|
||||||
redisConn *RedisConnection
|
|
||||||
redisMgr *RedisJobManager
|
|
||||||
workers []*JobWorker
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
wg sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
// RedisConnection wraps Redis connection details
|
// RedisConnection wraps Redis connection details
|
||||||
type RedisConnection struct {
|
type RedisConnection struct {
|
||||||
TCPPort int
|
TCPPort int
|
||||||
@ -65,105 +49,6 @@ type JobWorker struct {
|
|||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJobManager creates a new job manager
|
|
||||||
func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) {
|
|
||||||
// Create OurDB directory if it doesn't exist
|
|
||||||
if err := os.MkdirAll(config.OurDBPath, 0755); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create OurDB directory: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize OurDB
|
|
||||||
ourDBConfig := ourdb.DefaultConfig()
|
|
||||||
ourDBConfig.Path = config.OurDBPath
|
|
||||||
ourDBConfig.IncrementalMode = true
|
|
||||||
|
|
||||||
db, err := ourdb.New(ourDBConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create context with cancel
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
// Initialize Redis job manager
|
|
||||||
redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath)
|
|
||||||
if err != nil {
|
|
||||||
// Close OurDB before returning error
|
|
||||||
if closeErr := db.Close(); closeErr != nil {
|
|
||||||
log.Printf("Warning: failed to close OurDB: %v", closeErr)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("failed to create Redis job manager: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create job manager
|
|
||||||
jobMgr := &JobManager{
|
|
||||||
config: config,
|
|
||||||
ourDB: db,
|
|
||||||
redisConn: redisConn,
|
|
||||||
redisMgr: redisMgr,
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
}
|
|
||||||
|
|
||||||
return jobMgr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start starts the job manager
|
|
||||||
func (jm *JobManager) Start() error {
|
|
||||||
log.Println("Starting job manager...")
|
|
||||||
|
|
||||||
// Start workers
|
|
||||||
jm.workers = make([]*JobWorker, jm.config.WorkerCount)
|
|
||||||
for i := 0; i < jm.config.WorkerCount; i++ {
|
|
||||||
worker := &JobWorker{
|
|
||||||
id: i,
|
|
||||||
jobMgr: jm,
|
|
||||||
ctx: jm.ctx,
|
|
||||||
wg: &jm.wg,
|
|
||||||
}
|
|
||||||
jm.workers[i] = worker
|
|
||||||
jm.startWorker(worker)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Job manager started with %d workers", jm.config.WorkerCount)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop stops the job manager
|
|
||||||
func (jm *JobManager) Stop() error {
|
|
||||||
log.Println("Stopping job manager...")
|
|
||||||
|
|
||||||
// Signal all workers to stop
|
|
||||||
jm.cancel()
|
|
||||||
|
|
||||||
// Wait for all workers to finish
|
|
||||||
jm.wg.Wait()
|
|
||||||
|
|
||||||
// Close Redis job manager
|
|
||||||
if jm.redisMgr != nil {
|
|
||||||
if err := jm.redisMgr.Close(); err != nil {
|
|
||||||
log.Printf("Warning: failed to close Redis job manager: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close OurDB
|
|
||||||
if err := jm.ourDB.Close(); err != nil {
|
|
||||||
return fmt.Errorf("failed to close OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("Job manager stopped")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// startWorker starts a worker
|
|
||||||
func (jm *JobManager) startWorker(worker *JobWorker) {
|
|
||||||
jm.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer jm.wg.Done()
|
|
||||||
worker.run()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is the main worker loop
|
// run is the main worker loop
|
||||||
func (w *JobWorker) run() {
|
func (w *JobWorker) run() {
|
||||||
log.Printf("Worker %d started", w.id)
|
log.Printf("Worker %d started", w.id)
|
||||||
@ -244,229 +129,3 @@ func (w *JobWorker) processJob(job *Job) error {
|
|||||||
log.Printf("Worker %d completed job %d", w.id, job.JobID)
|
log.Printf("Worker %d completed job %d", w.id, job.JobID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateJob creates a new job
|
|
||||||
func (jm *JobManager) CreateJob(topic, params string) (*Job, error) {
|
|
||||||
// Create new job
|
|
||||||
job := &Job{
|
|
||||||
Topic: topic,
|
|
||||||
Params: params,
|
|
||||||
Status: JobStatusNew,
|
|
||||||
TimeScheduled: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store job in OurDB
|
|
||||||
jobData, err := json.Marshal(job)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to marshal job: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add job to OurDB with auto-incremented ID
|
|
||||||
id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{
|
|
||||||
Data: jobData,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to store job in OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job with assigned ID
|
|
||||||
job.JobID = id
|
|
||||||
|
|
||||||
// Store job in Redis
|
|
||||||
if err := jm.redisMgr.EnqueueJob(job); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to store job in Redis: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID)
|
|
||||||
return job, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetJob retrieves a job by ID
|
|
||||||
func (jm *JobManager) GetJob(jobID uint32) (*Job, error) {
|
|
||||||
// Get job from OurDB
|
|
||||||
jobData, err := jm.ourDB.Get(jobID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get job from OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse job data
|
|
||||||
job := &Job{}
|
|
||||||
if err := json.Unmarshal(jobData, job); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to unmarshal job data: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return job, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateJobStatus updates the status of a job
|
|
||||||
func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error {
|
|
||||||
// Get job from OurDB
|
|
||||||
job, err := jm.GetJob(jobID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update status
|
|
||||||
job.Status = status
|
|
||||||
|
|
||||||
// Update timestamps based on status
|
|
||||||
now := time.Now().Unix()
|
|
||||||
if status == JobStatusActive && job.TimeStart == 0 {
|
|
||||||
job.TimeStart = now
|
|
||||||
} else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 {
|
|
||||||
job.TimeEnd = now
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store updated job in OurDB
|
|
||||||
jobData, err := json.Marshal(job)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal job: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job in OurDB
|
|
||||||
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
|
||||||
ID: &jobID,
|
|
||||||
Data: jobData,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If job is done or has error, remove from Redis
|
|
||||||
if status == JobStatusDone || status == JobStatusError {
|
|
||||||
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
|
|
||||||
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Otherwise, update in Redis
|
|
||||||
if err := jm.redisMgr.UpdateJobStatus(job); err != nil {
|
|
||||||
log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CompleteJob marks a job as completed
|
|
||||||
func (jm *JobManager) CompleteJob(jobID uint32, result string) error {
|
|
||||||
// Get job from OurDB
|
|
||||||
job, err := jm.GetJob(jobID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job
|
|
||||||
job.Status = JobStatusDone
|
|
||||||
job.TimeEnd = time.Now().Unix()
|
|
||||||
job.Result = result
|
|
||||||
|
|
||||||
// Store updated job in OurDB
|
|
||||||
jobData, err := json.Marshal(job)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal job: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job in OurDB
|
|
||||||
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
|
||||||
ID: &jobID,
|
|
||||||
Data: jobData,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from Redis
|
|
||||||
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
|
|
||||||
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Job %d completed and removed from Redis", jobID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FailJob marks a job as failed
|
|
||||||
func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error {
|
|
||||||
// Get job from OurDB
|
|
||||||
job, err := jm.GetJob(jobID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job
|
|
||||||
job.Status = JobStatusError
|
|
||||||
job.TimeEnd = time.Now().Unix()
|
|
||||||
job.Error = errorMsg
|
|
||||||
|
|
||||||
// Store updated job in OurDB
|
|
||||||
jobData, err := json.Marshal(job)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal job: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job in OurDB
|
|
||||||
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
|
||||||
ID: &jobID,
|
|
||||||
Data: jobData,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from Redis
|
|
||||||
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
|
|
||||||
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Job %d failed and removed from Redis", jobID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateJobInBothStores updates a job in both OurDB and Redis
|
|
||||||
func (jm *JobManager) updateJobInBothStores(job *Job) error {
|
|
||||||
// Store job in OurDB
|
|
||||||
jobData, err := json.Marshal(job)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal job: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job in OurDB
|
|
||||||
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
|
||||||
ID: &job.JobID,
|
|
||||||
Data: jobData,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job in Redis
|
|
||||||
if err := jm.redisMgr.UpdateJobStatus(job); err != nil {
|
|
||||||
return fmt.Errorf("failed to update job in Redis: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// completeJobProcessing updates a completed job in OurDB and removes it from Redis
|
|
||||||
func (jm *JobManager) completeJobProcessing(job *Job) error {
|
|
||||||
// Store job in OurDB
|
|
||||||
jobData, err := json.Marshal(job)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal job: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update job in OurDB
|
|
||||||
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
|
|
||||||
ID: &job.JobID,
|
|
||||||
Data: jobData,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update job in OurDB: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from Redis
|
|
||||||
if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil {
|
|
||||||
return fmt.Errorf("failed to remove job from Redis: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -19,31 +19,51 @@ type RedisJobManager struct {
|
|||||||
|
|
||||||
// NewRedisJobManager creates a new Redis job manager
|
// NewRedisJobManager creates a new Redis job manager
|
||||||
func NewRedisJobManager(tcpPort int, unixSocketPath string) (*RedisJobManager, error) {
|
func NewRedisJobManager(tcpPort int, unixSocketPath string) (*RedisJobManager, error) {
|
||||||
// Determine network type and address
|
var client *redis.Client
|
||||||
var networkType, addr string
|
var err error
|
||||||
if unixSocketPath != "" {
|
|
||||||
networkType = "unix"
|
|
||||||
addr = unixSocketPath
|
|
||||||
} else {
|
|
||||||
networkType = "tcp"
|
|
||||||
addr = fmt.Sprintf("localhost:%d", tcpPort)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create Redis client
|
// Try Unix socket first if provided
|
||||||
client := redis.NewClient(&redis.Options{
|
if unixSocketPath != "" {
|
||||||
Network: networkType,
|
log.Printf("Attempting to connect to Redis via Unix socket: %s", unixSocketPath)
|
||||||
Addr: addr,
|
client = redis.NewClient(&redis.Options{
|
||||||
|
Network: "unix",
|
||||||
|
Addr: unixSocketPath,
|
||||||
DB: 0,
|
DB: 0,
|
||||||
DialTimeout: 5 * time.Second,
|
DialTimeout: 2 * time.Second,
|
||||||
ReadTimeout: 5 * time.Second,
|
ReadTimeout: 5 * time.Second,
|
||||||
WriteTimeout: 5 * time.Second,
|
WriteTimeout: 5 * time.Second,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Test connection
|
// Test connection
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
_, err := client.Ping(ctx).Result()
|
_, pingErr := client.Ping(ctx).Result()
|
||||||
if err != nil {
|
if pingErr != nil {
|
||||||
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
log.Printf("Failed to connect to Redis via Unix socket: %v, falling back to TCP", pingErr)
|
||||||
|
// Close the failed client
|
||||||
|
client.Close()
|
||||||
|
err = pingErr // Update the outer err variable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If Unix socket connection failed or wasn't provided, use TCP
|
||||||
|
if unixSocketPath == "" || err != nil {
|
||||||
|
tcpAddr := fmt.Sprintf("localhost:%d", tcpPort)
|
||||||
|
log.Printf("Connecting to Redis via TCP: %s", tcpAddr)
|
||||||
|
client = redis.NewClient(&redis.Options{
|
||||||
|
Network: "tcp",
|
||||||
|
Addr: tcpAddr,
|
||||||
|
DB: 0,
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
ReadTimeout: 5 * time.Second,
|
||||||
|
WriteTimeout: 5 * time.Second,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test connection
|
||||||
|
ctx := context.Background()
|
||||||
|
_, pingErr := client.Ping(ctx).Result()
|
||||||
|
if pingErr != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to Redis: %w", pingErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &RedisJobManager{
|
return &RedisJobManager{
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
# Create necessary directories
|
# Create necessary directories
|
||||||
mkdir -p data/jobsdb
|
mkdir -p data/jobsdb
|
||||||
|
mkdir -p bin
|
||||||
|
|
||||||
# Build the job test
|
# Build the job test
|
||||||
echo "Building job test..."
|
echo "Building job test..."
|
||||||
|
Loading…
Reference in New Issue
Block a user