...
This commit is contained in:
348
pkg/servers/ui/models/job_manager.go
Normal file
348
pkg/servers/ui/models/job_manager.go
Normal file
@@ -0,0 +1,348 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.ourworld.tf/herocode/heroagent/pkg/herojobs"
|
||||
)
|
||||
|
||||
// JobManager provides an interface for job management operations
|
||||
type JobManager interface {
|
||||
// GetAllJobs returns all jobs
|
||||
GetAllJobs() ([]*JobInfo, error)
|
||||
// GetJobsByCircle returns jobs for a specific circle
|
||||
GetJobsByCircle(circleID string) ([]*JobInfo, error)
|
||||
// GetJobsByTopic returns jobs for a specific topic
|
||||
GetJobsByTopic(topic string) ([]*JobInfo, error)
|
||||
// GetJobsByStatus returns jobs with a specific status
|
||||
GetJobsByStatus(status herojobs.JobStatus) ([]*JobInfo, error)
|
||||
// GetJob returns a specific job by ID
|
||||
GetJob(jobID uint32) (*JobInfo, error)
|
||||
}
|
||||
|
||||
// JobInfo represents job information for the UI
|
||||
type JobInfo struct {
|
||||
JobID uint32 `json:"jobid"`
|
||||
SessionKey string `json:"sessionkey"`
|
||||
CircleID string `json:"circleid"`
|
||||
Topic string `json:"topic"`
|
||||
ParamsType herojobs.ParamsType `json:"params_type"`
|
||||
Status herojobs.JobStatus `json:"status"`
|
||||
TimeScheduled time.Time `json:"time_scheduled"`
|
||||
TimeStart time.Time `json:"time_start"`
|
||||
TimeEnd time.Time `json:"time_end"`
|
||||
Duration string `json:"duration"`
|
||||
Error string `json:"error"`
|
||||
HasError bool `json:"has_error"`
|
||||
}
|
||||
|
||||
// HeroJobManager implements JobManager interface using herojobs package
|
||||
type HeroJobManager struct {
|
||||
factory *herojobs.Factory
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewHeroJobManager creates a new HeroJobManager
|
||||
func NewHeroJobManager(redisURL string) (*HeroJobManager, error) {
|
||||
factory, err := herojobs.NewFactory(redisURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create job factory: %w", err)
|
||||
}
|
||||
|
||||
return &HeroJobManager{
|
||||
factory: factory,
|
||||
ctx: context.Background(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetAllJobs returns all jobs from Redis
|
||||
func (jm *HeroJobManager) GetAllJobs() ([]*JobInfo, error) {
|
||||
// This is a simplified implementation
|
||||
// In a real-world scenario, you would need to:
|
||||
// 1. Get all circles and topics
|
||||
// 2. For each circle/topic combination, get all jobs
|
||||
// 3. Combine the results
|
||||
|
||||
// For now, we'll just list all job IDs from Redis
|
||||
jobIDs, err := jm.factory.ListJobs(jm.ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list jobs: %w", err)
|
||||
}
|
||||
|
||||
jobs := make([]*JobInfo, 0, len(jobIDs))
|
||||
for _, jobID := range jobIDs {
|
||||
// Extract job ID from the key
|
||||
// Assuming the key format is "job:<id>"
|
||||
jobData, err := jm.factory.GetJob(jm.ctx, jobID)
|
||||
if err != nil {
|
||||
log.Printf("Warning: failed to get job %s: %v", jobID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse job data
|
||||
job, err := herojobs.NewJobFromJSON(jobData)
|
||||
if err != nil {
|
||||
log.Printf("Warning: failed to parse job data for %s: %v", jobID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert to JobInfo
|
||||
jobInfo := convertToJobInfo(job)
|
||||
jobs = append(jobs, jobInfo)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// GetJobsByCircle returns jobs for a specific circle
|
||||
func (jm *HeroJobManager) GetJobsByCircle(circleID string) ([]*JobInfo, error) {
|
||||
// Implementation would filter jobs by circle ID
|
||||
// For now, return all jobs and filter in memory
|
||||
allJobs, err := jm.GetAllJobs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filteredJobs := make([]*JobInfo, 0)
|
||||
for _, job := range allJobs {
|
||||
if job.CircleID == circleID {
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
}
|
||||
|
||||
return filteredJobs, nil
|
||||
}
|
||||
|
||||
// GetJobsByTopic returns jobs for a specific topic
|
||||
func (jm *HeroJobManager) GetJobsByTopic(topic string) ([]*JobInfo, error) {
|
||||
// Implementation would filter jobs by topic
|
||||
// For now, return all jobs and filter in memory
|
||||
allJobs, err := jm.GetAllJobs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filteredJobs := make([]*JobInfo, 0)
|
||||
for _, job := range allJobs {
|
||||
if job.Topic == topic {
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
}
|
||||
|
||||
return filteredJobs, nil
|
||||
}
|
||||
|
||||
// GetJobsByStatus returns jobs with a specific status
|
||||
func (jm *HeroJobManager) GetJobsByStatus(status herojobs.JobStatus) ([]*JobInfo, error) {
|
||||
// Implementation would filter jobs by status
|
||||
// For now, return all jobs and filter in memory
|
||||
allJobs, err := jm.GetAllJobs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filteredJobs := make([]*JobInfo, 0)
|
||||
for _, job := range allJobs {
|
||||
if job.Status == status {
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
}
|
||||
|
||||
return filteredJobs, nil
|
||||
}
|
||||
|
||||
// GetJob returns a specific job by ID
|
||||
func (jm *HeroJobManager) GetJob(jobID uint32) (*JobInfo, error) {
|
||||
// Implementation would get a specific job by ID
|
||||
// This is a placeholder implementation
|
||||
allJobs, err := jm.GetAllJobs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, job := range allJobs {
|
||||
if job.JobID == jobID {
|
||||
return job, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("job not found: %d", jobID)
|
||||
}
|
||||
|
||||
// convertToJobInfo converts a herojobs.Job to a JobInfo
|
||||
func convertToJobInfo(job *herojobs.Job) *JobInfo {
|
||||
// Convert Unix timestamps to time.Time
|
||||
timeScheduled := time.Unix(job.TimeScheduled, 0)
|
||||
timeStart := time.Time{}
|
||||
if job.TimeStart > 0 {
|
||||
timeStart = time.Unix(job.TimeStart, 0)
|
||||
}
|
||||
timeEnd := time.Time{}
|
||||
if job.TimeEnd > 0 {
|
||||
timeEnd = time.Unix(job.TimeEnd, 0)
|
||||
}
|
||||
|
||||
// Calculate duration
|
||||
var duration string
|
||||
if job.TimeStart > 0 {
|
||||
if job.TimeEnd > 0 {
|
||||
// Job has completed
|
||||
duration = time.Unix(job.TimeEnd, 0).Sub(time.Unix(job.TimeStart, 0)).String()
|
||||
} else {
|
||||
// Job is still running
|
||||
duration = time.Since(time.Unix(job.TimeStart, 0)).String()
|
||||
}
|
||||
} else {
|
||||
duration = "Not started"
|
||||
}
|
||||
|
||||
return &JobInfo{
|
||||
JobID: job.JobID,
|
||||
SessionKey: job.SessionKey,
|
||||
CircleID: job.CircleID,
|
||||
Topic: job.Topic,
|
||||
ParamsType: job.ParamsType,
|
||||
Status: job.Status,
|
||||
TimeScheduled: timeScheduled,
|
||||
TimeStart: timeStart,
|
||||
TimeEnd: timeEnd,
|
||||
Duration: duration,
|
||||
Error: job.Error,
|
||||
HasError: job.Error != "",
|
||||
}
|
||||
}
|
||||
|
||||
// MockJobManager is a mock implementation of JobManager for testing
|
||||
type MockJobManager struct{}
|
||||
|
||||
// NewMockJobManager creates a new MockJobManager
|
||||
func NewMockJobManager() *MockJobManager {
|
||||
return &MockJobManager{}
|
||||
}
|
||||
|
||||
// GetAllJobs returns mock jobs
|
||||
func (m *MockJobManager) GetAllJobs() ([]*JobInfo, error) {
|
||||
return generateMockJobs(), nil
|
||||
}
|
||||
|
||||
// GetJobsByCircle returns mock jobs for a circle
|
||||
func (m *MockJobManager) GetJobsByCircle(circleID string) ([]*JobInfo, error) {
|
||||
allJobs := generateMockJobs()
|
||||
filteredJobs := make([]*JobInfo, 0)
|
||||
for _, job := range allJobs {
|
||||
if job.CircleID == circleID {
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
}
|
||||
return filteredJobs, nil
|
||||
}
|
||||
|
||||
// GetJobsByTopic returns mock jobs for a topic
|
||||
func (m *MockJobManager) GetJobsByTopic(topic string) ([]*JobInfo, error) {
|
||||
allJobs := generateMockJobs()
|
||||
filteredJobs := make([]*JobInfo, 0)
|
||||
for _, job := range allJobs {
|
||||
if job.Topic == topic {
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
}
|
||||
return filteredJobs, nil
|
||||
}
|
||||
|
||||
// GetJobsByStatus returns mock jobs with a status
|
||||
func (m *MockJobManager) GetJobsByStatus(status herojobs.JobStatus) ([]*JobInfo, error) {
|
||||
allJobs := generateMockJobs()
|
||||
filteredJobs := make([]*JobInfo, 0)
|
||||
for _, job := range allJobs {
|
||||
if job.Status == status {
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
}
|
||||
return filteredJobs, nil
|
||||
}
|
||||
|
||||
// GetJob returns a mock job by ID
|
||||
func (m *MockJobManager) GetJob(jobID uint32) (*JobInfo, error) {
|
||||
allJobs := generateMockJobs()
|
||||
for _, job := range allJobs {
|
||||
if job.JobID == jobID {
|
||||
return job, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("job not found: %d", jobID)
|
||||
}
|
||||
|
||||
// generateMockJobs generates mock jobs for testing
|
||||
func generateMockJobs() []*JobInfo {
|
||||
now := time.Now()
|
||||
return []*JobInfo{
|
||||
{
|
||||
JobID: 1,
|
||||
CircleID: "circle1",
|
||||
Topic: "email",
|
||||
ParamsType: herojobs.ParamsTypeHeroScript,
|
||||
Status: herojobs.JobStatusDone,
|
||||
TimeScheduled: now.Add(-30 * time.Minute),
|
||||
TimeStart: now.Add(-29 * time.Minute),
|
||||
TimeEnd: now.Add(-28 * time.Minute),
|
||||
Duration: "1m0s",
|
||||
Error: "",
|
||||
HasError: false,
|
||||
},
|
||||
{
|
||||
JobID: 2,
|
||||
CircleID: "circle1",
|
||||
Topic: "backup",
|
||||
ParamsType: herojobs.ParamsTypeOpenRPC,
|
||||
Status: herojobs.JobStatusActive,
|
||||
TimeScheduled: now.Add(-15 * time.Minute),
|
||||
TimeStart: now.Add(-14 * time.Minute),
|
||||
TimeEnd: time.Time{},
|
||||
Duration: "14m0s",
|
||||
Error: "",
|
||||
HasError: false,
|
||||
},
|
||||
{
|
||||
JobID: 3,
|
||||
CircleID: "circle2",
|
||||
Topic: "sync",
|
||||
ParamsType: herojobs.ParamsTypeRhaiScript,
|
||||
Status: herojobs.JobStatusError,
|
||||
TimeScheduled: now.Add(-45 * time.Minute),
|
||||
TimeStart: now.Add(-44 * time.Minute),
|
||||
TimeEnd: now.Add(-43 * time.Minute),
|
||||
Duration: "1m0s",
|
||||
Error: "Failed to connect to remote server",
|
||||
HasError: true,
|
||||
},
|
||||
{
|
||||
JobID: 4,
|
||||
CircleID: "circle2",
|
||||
Topic: "email",
|
||||
ParamsType: herojobs.ParamsTypeHeroScript,
|
||||
Status: herojobs.JobStatusNew,
|
||||
TimeScheduled: now.Add(-5 * time.Minute),
|
||||
TimeStart: time.Time{},
|
||||
TimeEnd: time.Time{},
|
||||
Duration: "Not started",
|
||||
Error: "",
|
||||
HasError: false,
|
||||
},
|
||||
{
|
||||
JobID: 5,
|
||||
CircleID: "circle3",
|
||||
Topic: "ai",
|
||||
ParamsType: herojobs.ParamsTypeAI,
|
||||
Status: herojobs.JobStatusDone,
|
||||
TimeScheduled: now.Add(-60 * time.Minute),
|
||||
TimeStart: now.Add(-59 * time.Minute),
|
||||
TimeEnd: now.Add(-40 * time.Minute),
|
||||
Duration: "19m0s",
|
||||
Error: "",
|
||||
HasError: false,
|
||||
},
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user