diff --git a/cmd/jobtest/main.go b/cmd/jobtest/main.go index 70fdcce..18f5763 100644 --- a/cmd/jobtest/main.go +++ b/cmd/jobtest/main.go @@ -19,8 +19,8 @@ func main() { // Customize configuration if needed config.Redis.TCPPort = 6379 - config.Redis.UnixSocketPath = "/tmp/redis.sock" - config.Jobs.OurDBPath = "./data/jobsdb" + config.Redis.UnixSocketPath = "" // Use TCP connection only + config.Jobs.OurDBPath = "/tmp/jobsdb" config.Jobs.WorkerCount = 3 config.Jobs.QueuePollInterval = 200 * time.Millisecond diff --git a/pkg/servers/heroagent/README.md b/pkg/servers/heroagent/README.md index 2a620d6..a87fa8b 100644 --- a/pkg/servers/heroagent/README.md +++ b/pkg/servers/heroagent/README.md @@ -22,11 +22,14 @@ The job system follows a specific flow: 1. **Job Creation**: - When a job is created, it's stored in both OurDB and Redis - OurDB provides persistent storage with history tracking - - Redis provides fast access and queuing capabilities + - Redis stores the job data and adds the job ID to a queue for processing + - Each job is stored in Redis using a key pattern: `herojobs::` + - Each job ID is added to a queue using a key pattern: `heroqueue:` 2. **Job Processing**: - Workers continuously poll Redis queues for new jobs - - When a job is found, it's updated to "active" status in both OurDB and Redis + - When a job is found, it's fetched from Redis and updated to "active" status + - The updated job is stored in both OurDB and Redis - The job is processed based on its parameters 3. **Job Completion**: @@ -34,6 +37,60 @@ The job system follows a specific flow: - The job is removed from Redis to keep only active jobs in memory - This approach ensures efficient memory usage while maintaining a complete history +### Data Flow Diagram + +``` +Job Creation: + ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Client │────▶│ OurDB │ │ Redis │ + └─────────┘ └────┬────┘ └────┬────┘ + │ │ + │ Store Job │ Store Job + │ │ + ▼ ▼ + ┌─────────┐ ┌─────────┐ + │ Job Data│ │ Job Data│ + └─────────┘ └─────────┘ + │ + │ Add to Queue + │ + ▼ + ┌─────────┐ + │ Queue │ + └─────────┘ + +Job Processing: + ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Worker │────▶│ Redis │────▶│ OurDB │ + └─────────┘ └────┬────┘ └────┬────┘ + │ │ + │ Fetch Job │ Update Job + │ from Queue │ + ▼ ▼ + ┌─────────┐ ┌─────────┐ + │ Job Data│ │ Job Data│ + └─────────┘ └─────────┘ + │ + │ Process Job + │ + ▼ + ┌─────────┐ + │ Result │ + └─────────┘ + +Job Completion: + ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Worker │────▶│ OurDB │ │ Redis │ + └─────────┘ └────┬────┘ └────┬────┘ + │ │ + │ Update Job │ Remove Job + │ │ + ▼ ▼ + ┌─────────┐ ┌─────────┐ + │ Job Data│ │ ✓ │ + └─────────┘ └─────────┘ +``` + ### Components - **JobManager**: Coordinates job operations between OurDB and Redis diff --git a/pkg/servers/heroagent/config.go b/pkg/servers/heroagent/config.go index 099c2c0..770feab 100644 --- a/pkg/servers/heroagent/config.go +++ b/pkg/servers/heroagent/config.go @@ -62,8 +62,8 @@ type JobsConfig struct { func DefaultConfig() Config { return Config{ Redis: RedisConfig{ - TCPPort: 6378, - UnixSocketPath: "/tmp/redis.sock", + TCPPort: 6379, + UnixSocketPath: "", // Empty string means use TCP only }, WebDAV: WebDAVConfig{ Config: webdavserver.DefaultConfig(), diff --git a/pkg/servers/heroagent/jobmanager.go b/pkg/servers/heroagent/jobmanager.go new file mode 100644 index 0000000..547f7be --- /dev/null +++ b/pkg/servers/heroagent/jobmanager.go @@ -0,0 +1,351 @@ +package heroagent + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" + + "git.ourworld.tf/herocode/heroagent/pkg/data/ourdb" +) + +// JobManager handles job management between OurDB and Redis +type JobManager struct { + config JobsConfig + ourDB *ourdb.OurDB + redisConn *RedisConnection + redisMgr *RedisJobManager + workers []*JobWorker + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewJobManager creates a new job manager +func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) { + // Create OurDB directory if it doesn't exist + if err := os.MkdirAll(config.OurDBPath, 0755); err != nil { + return nil, fmt.Errorf("failed to create OurDB directory: %w", err) + } + + // Initialize OurDB + ourDBConfig := ourdb.DefaultConfig() + ourDBConfig.Path = config.OurDBPath + ourDBConfig.IncrementalMode = true + + db, err := ourdb.New(ourDBConfig) + if err != nil { + return nil, fmt.Errorf("failed to create OurDB: %w", err) + } + + // Initialize Redis job manager + redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath) + if err != nil { + // Close OurDB before returning error + if closeErr := db.Close(); closeErr != nil { + log.Printf("Warning: failed to close OurDB: %v", closeErr) + } + return nil, fmt.Errorf("failed to create Redis job manager: %w", err) + } + + // Create context with cancel + ctx, cancel := context.WithCancel(context.Background()) + + // Create job manager + jobMgr := &JobManager{ + config: config, + ourDB: db, + redisConn: redisConn, + redisMgr: redisMgr, + ctx: ctx, + cancel: cancel, + } + + return jobMgr, nil +} + +// Start starts the job manager +func (jm *JobManager) Start() error { + log.Println("Starting job manager...") + + // Start workers + jm.workers = make([]*JobWorker, jm.config.WorkerCount) + for i := 0; i < jm.config.WorkerCount; i++ { + worker := &JobWorker{ + id: i, + jobMgr: jm, + ctx: jm.ctx, + wg: &jm.wg, + } + jm.workers[i] = worker + jm.startWorker(worker) + } + + log.Printf("Job manager started with %d workers", jm.config.WorkerCount) + return nil +} + +// Stop stops the job manager +func (jm *JobManager) Stop() error { + log.Println("Stopping job manager...") + + // Signal all workers to stop + jm.cancel() + + // Wait for all workers to finish + jm.wg.Wait() + + // Close Redis job manager + if jm.redisMgr != nil { + if err := jm.redisMgr.Close(); err != nil { + log.Printf("Warning: failed to close Redis job manager: %v", err) + } + } + + // Close OurDB + if err := jm.ourDB.Close(); err != nil { + // Log the error but don't fail the shutdown + log.Printf("Warning: failed to close OurDB: %v", err) + } + + log.Println("Job manager stopped") + return nil +} + +// startWorker starts a worker +func (jm *JobManager) startWorker(worker *JobWorker) { + jm.wg.Add(1) + go func() { + defer jm.wg.Done() + worker.run() + }() +} + +// CreateJob creates a new job +func (jm *JobManager) CreateJob(topic, params string) (*Job, error) { + // Create new job + job := &Job{ + Topic: topic, + Params: params, + Status: JobStatusNew, + TimeScheduled: time.Now().Unix(), + } + + // Store job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return nil, fmt.Errorf("failed to marshal job: %w", err) + } + + // Add job to OurDB with auto-incremented ID + id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{ + Data: jobData, + }) + if err != nil { + return nil, fmt.Errorf("failed to store job in OurDB: %w", err) + } + + // Update job with assigned ID + job.JobID = id + + // Store job in Redis and add to queue + if err := jm.redisMgr.EnqueueJob(job); err != nil { + return nil, fmt.Errorf("failed to store job in Redis: %w", err) + } + + log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID) + return job, nil +} + +// GetJob retrieves a job by ID +func (jm *JobManager) GetJob(jobID uint32) (*Job, error) { + // Get job from OurDB + jobData, err := jm.ourDB.Get(jobID) + if err != nil { + return nil, fmt.Errorf("failed to get job from OurDB: %w", err) + } + + // Parse job data + job := &Job{} + if err := json.Unmarshal(jobData, job); err != nil { + return nil, fmt.Errorf("failed to unmarshal job data: %w", err) + } + + return job, nil +} + +// UpdateJobStatus updates the status of a job +func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error { + // Get job from OurDB + job, err := jm.GetJob(jobID) + if err != nil { + return err + } + + // Update status + job.Status = status + + // Update timestamps based on status + now := time.Now().Unix() + if status == JobStatusActive && job.TimeStart == 0 { + job.TimeStart = now + } else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 { + job.TimeEnd = now + } + + // Store updated job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &jobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // If job is done or has error, remove from Redis + if status == JobStatusDone || status == JobStatusError { + if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { + log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) + } + } else { + // Otherwise, update in Redis + if err := jm.redisMgr.UpdateJobStatus(job); err != nil { + log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err) + } + } + + return nil +} + +// CompleteJob marks a job as completed +func (jm *JobManager) CompleteJob(jobID uint32, result string) error { + // Get job from OurDB + job, err := jm.GetJob(jobID) + if err != nil { + return err + } + + // Update job + job.Status = JobStatusDone + job.TimeEnd = time.Now().Unix() + job.Result = result + + // Store updated job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &jobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Remove from Redis + if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { + log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) + } + + log.Printf("Job %d completed and removed from Redis", jobID) + return nil +} + +// FailJob marks a job as failed +func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error { + // Get job from OurDB + job, err := jm.GetJob(jobID) + if err != nil { + return err + } + + // Update job + job.Status = JobStatusError + job.TimeEnd = time.Now().Unix() + job.Error = errorMsg + + // Store updated job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &jobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Remove from Redis + if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { + log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) + } + + log.Printf("Job %d failed and removed from Redis", jobID) + return nil +} + +// updateJobInBothStores updates a job in both OurDB and Redis +func (jm *JobManager) updateJobInBothStores(job *Job) error { + // Store job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &job.JobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Update job in Redis + if err := jm.redisMgr.UpdateJobStatus(job); err != nil { + return fmt.Errorf("failed to update job in Redis: %w", err) + } + + return nil +} + +// completeJobProcessing updates a completed job in OurDB and removes it from Redis +func (jm *JobManager) completeJobProcessing(job *Job) error { + // Store job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &job.JobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Remove from Redis + if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil { + return fmt.Errorf("failed to remove job from Redis: %w", err) + } + + return nil +} diff --git a/pkg/servers/heroagent/jobs.go b/pkg/servers/heroagent/jobs.go index 8882fd1..11c5437 100644 --- a/pkg/servers/heroagent/jobs.go +++ b/pkg/servers/heroagent/jobs.go @@ -2,14 +2,10 @@ package heroagent import ( "context" - "encoding/json" "fmt" "log" - "os" "sync" "time" - - "git.ourworld.tf/herocode/heroagent/pkg/data/ourdb" ) // JobStatus represents the status of a job @@ -39,18 +35,6 @@ type Job struct { Result string `json:"result"` } -// JobManager handles job management between OurDB and Redis -type JobManager struct { - config JobsConfig - ourDB *ourdb.OurDB - redisConn *RedisConnection - redisMgr *RedisJobManager - workers []*JobWorker - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup -} - // RedisConnection wraps Redis connection details type RedisConnection struct { TCPPort int @@ -65,105 +49,6 @@ type JobWorker struct { wg *sync.WaitGroup } -// NewJobManager creates a new job manager -func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) { - // Create OurDB directory if it doesn't exist - if err := os.MkdirAll(config.OurDBPath, 0755); err != nil { - return nil, fmt.Errorf("failed to create OurDB directory: %w", err) - } - - // Initialize OurDB - ourDBConfig := ourdb.DefaultConfig() - ourDBConfig.Path = config.OurDBPath - ourDBConfig.IncrementalMode = true - - db, err := ourdb.New(ourDBConfig) - if err != nil { - return nil, fmt.Errorf("failed to create OurDB: %w", err) - } - - // Create context with cancel - ctx, cancel := context.WithCancel(context.Background()) - - // Initialize Redis job manager - redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath) - if err != nil { - // Close OurDB before returning error - if closeErr := db.Close(); closeErr != nil { - log.Printf("Warning: failed to close OurDB: %v", closeErr) - } - return nil, fmt.Errorf("failed to create Redis job manager: %w", err) - } - - // Create job manager - jobMgr := &JobManager{ - config: config, - ourDB: db, - redisConn: redisConn, - redisMgr: redisMgr, - ctx: ctx, - cancel: cancel, - } - - return jobMgr, nil -} - -// Start starts the job manager -func (jm *JobManager) Start() error { - log.Println("Starting job manager...") - - // Start workers - jm.workers = make([]*JobWorker, jm.config.WorkerCount) - for i := 0; i < jm.config.WorkerCount; i++ { - worker := &JobWorker{ - id: i, - jobMgr: jm, - ctx: jm.ctx, - wg: &jm.wg, - } - jm.workers[i] = worker - jm.startWorker(worker) - } - - log.Printf("Job manager started with %d workers", jm.config.WorkerCount) - return nil -} - -// Stop stops the job manager -func (jm *JobManager) Stop() error { - log.Println("Stopping job manager...") - - // Signal all workers to stop - jm.cancel() - - // Wait for all workers to finish - jm.wg.Wait() - - // Close Redis job manager - if jm.redisMgr != nil { - if err := jm.redisMgr.Close(); err != nil { - log.Printf("Warning: failed to close Redis job manager: %v", err) - } - } - - // Close OurDB - if err := jm.ourDB.Close(); err != nil { - return fmt.Errorf("failed to close OurDB: %w", err) - } - - log.Println("Job manager stopped") - return nil -} - -// startWorker starts a worker -func (jm *JobManager) startWorker(worker *JobWorker) { - jm.wg.Add(1) - go func() { - defer jm.wg.Done() - worker.run() - }() -} - // run is the main worker loop func (w *JobWorker) run() { log.Printf("Worker %d started", w.id) @@ -244,229 +129,3 @@ func (w *JobWorker) processJob(job *Job) error { log.Printf("Worker %d completed job %d", w.id, job.JobID) return nil } - -// CreateJob creates a new job -func (jm *JobManager) CreateJob(topic, params string) (*Job, error) { - // Create new job - job := &Job{ - Topic: topic, - Params: params, - Status: JobStatusNew, - TimeScheduled: time.Now().Unix(), - } - - // Store job in OurDB - jobData, err := json.Marshal(job) - if err != nil { - return nil, fmt.Errorf("failed to marshal job: %w", err) - } - - // Add job to OurDB with auto-incremented ID - id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{ - Data: jobData, - }) - if err != nil { - return nil, fmt.Errorf("failed to store job in OurDB: %w", err) - } - - // Update job with assigned ID - job.JobID = id - - // Store job in Redis - if err := jm.redisMgr.EnqueueJob(job); err != nil { - return nil, fmt.Errorf("failed to store job in Redis: %w", err) - } - - log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID) - return job, nil -} - -// GetJob retrieves a job by ID -func (jm *JobManager) GetJob(jobID uint32) (*Job, error) { - // Get job from OurDB - jobData, err := jm.ourDB.Get(jobID) - if err != nil { - return nil, fmt.Errorf("failed to get job from OurDB: %w", err) - } - - // Parse job data - job := &Job{} - if err := json.Unmarshal(jobData, job); err != nil { - return nil, fmt.Errorf("failed to unmarshal job data: %w", err) - } - - return job, nil -} - -// UpdateJobStatus updates the status of a job -func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error { - // Get job from OurDB - job, err := jm.GetJob(jobID) - if err != nil { - return err - } - - // Update status - job.Status = status - - // Update timestamps based on status - now := time.Now().Unix() - if status == JobStatusActive && job.TimeStart == 0 { - job.TimeStart = now - } else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 { - job.TimeEnd = now - } - - // Store updated job in OurDB - jobData, err := json.Marshal(job) - if err != nil { - return fmt.Errorf("failed to marshal job: %w", err) - } - - // Update job in OurDB - _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ - ID: &jobID, - Data: jobData, - }) - if err != nil { - return fmt.Errorf("failed to update job in OurDB: %w", err) - } - - // If job is done or has error, remove from Redis - if status == JobStatusDone || status == JobStatusError { - if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { - log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) - } - } else { - // Otherwise, update in Redis - if err := jm.redisMgr.UpdateJobStatus(job); err != nil { - log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err) - } - } - - return nil -} - -// CompleteJob marks a job as completed -func (jm *JobManager) CompleteJob(jobID uint32, result string) error { - // Get job from OurDB - job, err := jm.GetJob(jobID) - if err != nil { - return err - } - - // Update job - job.Status = JobStatusDone - job.TimeEnd = time.Now().Unix() - job.Result = result - - // Store updated job in OurDB - jobData, err := json.Marshal(job) - if err != nil { - return fmt.Errorf("failed to marshal job: %w", err) - } - - // Update job in OurDB - _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ - ID: &jobID, - Data: jobData, - }) - if err != nil { - return fmt.Errorf("failed to update job in OurDB: %w", err) - } - - // Remove from Redis - if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { - log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) - } - - log.Printf("Job %d completed and removed from Redis", jobID) - return nil -} - -// FailJob marks a job as failed -func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error { - // Get job from OurDB - job, err := jm.GetJob(jobID) - if err != nil { - return err - } - - // Update job - job.Status = JobStatusError - job.TimeEnd = time.Now().Unix() - job.Error = errorMsg - - // Store updated job in OurDB - jobData, err := json.Marshal(job) - if err != nil { - return fmt.Errorf("failed to marshal job: %w", err) - } - - // Update job in OurDB - _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ - ID: &jobID, - Data: jobData, - }) - if err != nil { - return fmt.Errorf("failed to update job in OurDB: %w", err) - } - - // Remove from Redis - if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { - log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) - } - - log.Printf("Job %d failed and removed from Redis", jobID) - return nil -} - -// updateJobInBothStores updates a job in both OurDB and Redis -func (jm *JobManager) updateJobInBothStores(job *Job) error { - // Store job in OurDB - jobData, err := json.Marshal(job) - if err != nil { - return fmt.Errorf("failed to marshal job: %w", err) - } - - // Update job in OurDB - _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ - ID: &job.JobID, - Data: jobData, - }) - if err != nil { - return fmt.Errorf("failed to update job in OurDB: %w", err) - } - - // Update job in Redis - if err := jm.redisMgr.UpdateJobStatus(job); err != nil { - return fmt.Errorf("failed to update job in Redis: %w", err) - } - - return nil -} - -// completeJobProcessing updates a completed job in OurDB and removes it from Redis -func (jm *JobManager) completeJobProcessing(job *Job) error { - // Store job in OurDB - jobData, err := json.Marshal(job) - if err != nil { - return fmt.Errorf("failed to marshal job: %w", err) - } - - // Update job in OurDB - _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ - ID: &job.JobID, - Data: jobData, - }) - if err != nil { - return fmt.Errorf("failed to update job in OurDB: %w", err) - } - - // Remove from Redis - if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil { - return fmt.Errorf("failed to remove job from Redis: %w", err) - } - - return nil -} diff --git a/pkg/servers/heroagent/redis.go b/pkg/servers/heroagent/redis.go index d7012ff..63ec4bb 100644 --- a/pkg/servers/heroagent/redis.go +++ b/pkg/servers/heroagent/redis.go @@ -19,31 +19,51 @@ type RedisJobManager struct { // NewRedisJobManager creates a new Redis job manager func NewRedisJobManager(tcpPort int, unixSocketPath string) (*RedisJobManager, error) { - // Determine network type and address - var networkType, addr string + var client *redis.Client + var err error + + // Try Unix socket first if provided if unixSocketPath != "" { - networkType = "unix" - addr = unixSocketPath - } else { - networkType = "tcp" - addr = fmt.Sprintf("localhost:%d", tcpPort) + log.Printf("Attempting to connect to Redis via Unix socket: %s", unixSocketPath) + client = redis.NewClient(&redis.Options{ + Network: "unix", + Addr: unixSocketPath, + DB: 0, + DialTimeout: 2 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + }) + + // Test connection + ctx := context.Background() + _, pingErr := client.Ping(ctx).Result() + if pingErr != nil { + log.Printf("Failed to connect to Redis via Unix socket: %v, falling back to TCP", pingErr) + // Close the failed client + client.Close() + err = pingErr // Update the outer err variable + } } - // Create Redis client - client := redis.NewClient(&redis.Options{ - Network: networkType, - Addr: addr, - DB: 0, - DialTimeout: 5 * time.Second, - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - }) + // If Unix socket connection failed or wasn't provided, use TCP + if unixSocketPath == "" || err != nil { + tcpAddr := fmt.Sprintf("localhost:%d", tcpPort) + log.Printf("Connecting to Redis via TCP: %s", tcpAddr) + client = redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: tcpAddr, + DB: 0, + DialTimeout: 5 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + }) + } // Test connection ctx := context.Background() - _, err := client.Ping(ctx).Result() - if err != nil { - return nil, fmt.Errorf("failed to connect to Redis: %w", err) + _, pingErr := client.Ping(ctx).Result() + if pingErr != nil { + return nil, fmt.Errorf("failed to connect to Redis: %w", pingErr) } return &RedisJobManager{ diff --git a/scripts/test_jobs.sh b/scripts/test_jobs.sh index 2c6c17e..da1404a 100755 --- a/scripts/test_jobs.sh +++ b/scripts/test_jobs.sh @@ -2,6 +2,7 @@ # Create necessary directories mkdir -p data/jobsdb +mkdir -p bin # Build the job test echo "Building job test..."