diff --git a/cmd/runner.vsh b/cmd/runner.vsh index 12ec76a..48f65d9 100755 --- a/cmd/runner.vsh +++ b/cmd/runner.vsh @@ -33,13 +33,15 @@ if help_requested { mut r := &runner.Runner{ name: runner_name namespace: namespace - redis_conn: redisclient.new( - if redis_url.len > 0 { - redis_url - } else { - '127.0.0.1:6379' - } - )! + client: &runner.Client{ + redis_conn: redisclient.new( + if redis_url.len > 0 { + redis_url + } else { + '127.0.0.1:6379' + } + )! + } } spawn r.run() diff --git a/examples/runner b/examples/runner deleted file mode 100755 index a66ce72..0000000 Binary files a/examples/runner and /dev/null differ diff --git a/examples/runner.vsh b/examples/runner.vsh index 12c7f0f..82a9feb 100755 --- a/examples/runner.vsh +++ b/examples/runner.vsh @@ -1,34 +1,39 @@ #!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run import time -import freeflowuniverse.herolib.baobab.runner +import herocode.runner import freeflowuniverse.herolib.core.redisclient -const namespace = '' +const namespace = 'example' mut r := &runner.Runner{ - name: 'test_runner' - redis_conn: redisclient.new('127.0.0.1:6379')! + name: 'example_runner' + namespace: namespace + client: &runner.Client{ + redis_conn: redisclient.new('127.0.0.1:6379')! + } } spawn r.run() -// job := runner.Job{ -// id: 'test_job_1' -// caller_id: 'test_caller' -// context_id: 'test_context_1' -// runner: 'test_runner' -// executor: 'tmux.session1' -// payload: 'sleep 10\necho "Hello from job 1"\nsleep 10' -// status: .dispatched -// timeout: 30 -// created_at: time.now() -// updated_at: time.now() -// } +job := runner.Job{ + id: 'test_job_1' + caller_id: 'test_caller' + context_id: 'test_context_1' + runner: 'test_runner' + executor: 'tmux.session1' + payload: '!!git.list' + status: .dispatched + timeout: 30 + created_at: time.now() + updated_at: time.now() +} -// mut redis_conn := redisclient.new('127.0.0.1:6379')! -// job.store_in_redis(mut redis_conn, namespace)! -// mut runner_q := redis_conn.queue_get(r.queue_key()!) -// runner_q.add(job.id)! +mut client := runner.Client{ + redis_conn: redisclient.new('127.0.0.1:6379')! +} +client.store_in_redis(job.id, job)! +mut runner_q := client.redis_conn.queue_get(r.queue_key()!) +runner_q.add(job.id)! for {} \ No newline at end of file diff --git a/examples/test_runner_rpc.vsh b/examples/test_runner_rpc.vsh deleted file mode 100644 index 5b9873d..0000000 --- a/examples/test_runner_rpc.vsh +++ /dev/null @@ -1,107 +0,0 @@ -#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run - -import freeflowuniverse.herolib.core.redisclient -import freeflowuniverse.herolib.baobab.runner -import time -import log - -// Test script to send RPC calls over Redis to test the runner -fn main() { - log.info('Starting runner RPC test script') - - // Connect to Redis - mut redis := redisclient.new('127.0.0.1:6379')! - - // Test runner configuration - runner_name := 'test_runner' - namespace := '' - - // Create test jobs - test_jobs := [ - runner.Job{ - id: 'test_job_1' - caller_id: 'test_caller' - context_id: 'test_context_1' - runner: runner_name - executor: 'tmux.session1' - payload: 'echo "Hello from job 1"' - status: .dispatched - timeout: 30 - created_at: time.now() - updated_at: time.now() - }, - runner.Job{ - id: 'test_job_2' - caller_id: 'test_caller' - context_id: 'test_context_2' - runner: runner_name - executor: 'tmux.session2.window1' - payload: 'ls -la && echo "Job 2 completed"' - status: .dispatched - timeout: 30 - created_at: time.now() - updated_at: time.now() - }, - runner.Job{ - id: 'test_job_3' - caller_id: 'test_caller' - context_id: 'test_context_3' - runner: runner_name - executor: 'tmux.session3.window1.pane1' - payload: 'date && echo "Current time from job 3"' - status: .dispatched - timeout: 30 - created_at: time.now() - updated_at: time.now() - } - ] - - log.info('Storing ${test_jobs.len} test jobs in Redis and dispatching to runner queue') - - // Store jobs in Redis and dispatch them to the runner queue - for job in test_jobs { - // Store job data in Redis - job.store_in_redis(mut redis, namespace) or { - log.error('Failed to store job ${job.id}: ${err}') - continue - } - - // Dispatch job to runner queue by pushing job ID to the queue - queue_key := if namespace.len > 0 { - "${namespace}:runner:${runner_name}" - } else { - "runner:${runner_name}" - } - - redis.rpush(queue_key, job.id) or { - log.error('Failed to dispatch job ${job.id} to queue ${queue_key}: ${err}') - continue - } - - log.info('Dispatched job ${job.id} to queue ${queue_key}') - - // Small delay between jobs - time.sleep(1 * time.second) - } - - log.info('All test jobs dispatched. Monitoring job status...') - - // Monitor job status for a while - for i in 0..30 { // Monitor for 30 seconds - log.info('--- Status check ${i + 1} ---') - - for job in test_jobs { - loaded_job := runner.load_from_redis(mut redis, job.id, namespace) or { - log.error('Failed to load job ${job.id}: ${err}') - continue - } - - log.info('Job ${loaded_job.id}: status=${loaded_job.status}, result="${loaded_job.result}", error="${loaded_job.error}"') - } - - time.sleep(1 * time.second) - } - - log.info('Test completed. Check tmux sessions to see if commands were executed.') - log.info('You can run "tmux list-sessions" to see created sessions.') -} diff --git a/src/client.v b/src/client.v new file mode 100644 index 0000000..fd3e74d --- /dev/null +++ b/src/client.v @@ -0,0 +1,126 @@ +module runner + +import rand +import time +import json +import freeflowuniverse.herolib.core.redisclient + +pub struct Client { + namespace string +pub mut: + redis_conn &redisclient.Redis +} + +fn (client Client) job_key(job_id string) ! string { + return if client.namespace.len > 0 { + '${client.namespace}:job:${job_id}' + } else { + 'job:${job_id}' + } +} + +fn (client Client) jobs_key() ! string { + return if client.namespace.len > 0 { + '${client.namespace}:job' + } else { + 'job' + } +} + +// Store this job in Redis +pub fn (mut client Client) store_in_redis(job_id string, job Job) ! { + job_key := client.job_key(job_id)! + + // Store job as JSON + job_json := json.encode(job) + client.redis_conn.hset(job_key, 'data', job_json) or { return error('Failed to store job ${job.id}: ${err}') } +} + +// Load a job from Redis by ID +pub fn (mut client Client) load_from_redis(job_id string) !Job { + job_key := client.job_key(job_id)! + + // Get job JSON from Redis + job_json := client.redis_conn.hget(job_key, 'data') or { + return error('Failed to load job ${job_id} from Redis: ${err}') + } + + if job_json.len == 0 { + return error('Job ${job_id} not found in Redis') + } + + // Parse job from JSON + job := json.decode(Job, job_json) or { + return error('Failed to decode job ${job_id} JSON: ${err}') + } + + return job +} + +// Update job status in Redis +pub fn (mut client Client) update_job_status(job_id string, status JobStatus) ! { + // Load job, update status, and store back + mut job := client.load_from_redis(job_id)! + job.status = status + job.updated_at = time.now() + client.store_in_redis(job_id, job)! +} + +// Get job status from Redis +pub fn (mut client Client) get_status(job_id string) !JobStatus { + job := client.load_from_redis(job_id)! + return job.status +} + +// Set job result in Redis +pub fn (mut client Client) set_result(job_id string, result string) ! { + // Load job, update result, and store back + mut job := client.load_from_redis(job_id)! + job.result = result + job.updated_at = time.now() + client.store_in_redis(job_id, job)! +} + +// Set job error in Redis +pub fn (mut client Client) set_error(job_id string, error_msg string) ! { + // Load job, update error, and store back + mut job := client.load_from_redis(job_id)! + job.error = error_msg + job.status = .error + job.updated_at = time.now() + client.store_in_redis(job_id, job)! +} + +// Delete job from Redis +pub fn (mut client Client) delete_from_redis(job_id string) ! { + job_key := client.job_key(job_id)! + client.redis_conn.del(job_key) or { + return error('Failed to delete job ${job_id} from Redis: ${err}') + } +} + +// List all job IDs from Redis +pub fn (mut client Client) list_all_job_ids() ![]string { + keys := client.redis_conn.keys(client.jobs_key()!) or { + return error('Failed to list job keys: ${err}') + } + + mut job_ids := []string{} + for key in keys { + // Extract job ID from key (remove namespace prefix) + if key.starts_with(client.jobs_key()!) { + job_id := key[client.jobs_key()!.len..] + job_ids << job_id + } + } + + return job_ids +} + +pub fn (client Client) queues_key() ! string { + return if client.namespace.len > 0 { + "${client.namespace}:runner" + } else { + "runner" + } +} \ No newline at end of file diff --git a/src/job.v b/src/job.v index f79115c..7e97439 100644 --- a/src/job.v +++ b/src/job.v @@ -41,94 +41,3 @@ fn generate_job_id() string { random_part := rand.int_in_range(1000, 9999) or { 1234 } return 'job_${timestamp}_${random_part}' } - -// Store this job in Redis -pub fn (job Job) store_in_redis(mut conn redisclient.Redis, namespace string) ! { - job_key := '${namespace}${job.id}' - - // Store job as JSON - job_json := json.encode(job) - conn.set(job_key, job_json) or { return error('Failed to store job ${job.id}: ${err}') } -} - -// Load a job from Redis by ID -pub fn load_from_redis(mut conn redisclient.Redis, job_id string, namespace string) !Job { - job_key := '${namespace}${job_id}' - - // Get job JSON from Redis - job_json := conn.get(job_key) or { - return error('Failed to load job ${job_id} from Redis: ${err}') - } - - if job_json.len == 0 { - return error('Job ${job_id} not found in Redis') - } - - // Parse job from JSON - job := json.decode(Job, job_json) or { - return error('Failed to decode job ${job_id} JSON: ${err}') - } - - return job -} - -// Update job status in Redis -pub fn update_job_status(mut conn redisclient.Redis, namespace string, job_id string, status JobStatus) ! { - // Load job, update status, and store back - mut job := load_from_redis(mut conn, job_id, namespace)! - job.status = status - job.updated_at = time.now() - job.store_in_redis(mut conn, namespace)! -} - -// Get job status from Redis -pub fn get_status(mut conn redisclient.Redis, job_id string, namespace string) !JobStatus { - job := load_from_redis(mut conn, job_id, namespace)! - return job.status -} - -// Set job result in Redis -pub fn set_result(mut conn redisclient.Redis, namespace string, job_id string, result string) ! { - // Load job, update result, and store back - mut job := load_from_redis(mut conn, job_id, namespace)! - job.result = result - job.updated_at = time.now() - job.store_in_redis(mut conn, namespace)! -} - -// Set job error in Redis -pub fn set_error(mut conn redisclient.Redis, namespace string, job_id string, error_msg string) ! { - // Load job, update error, and store back - mut job := load_from_redis(mut conn, job_id, namespace)! - job.error = error_msg - job.status = .error - job.updated_at = time.now() - job.store_in_redis(mut conn, namespace)! -} - -// Delete job from Redis -pub fn delete_from_redis(mut conn redisclient.Redis, job_id string, namespace string) ! { - job_key := '${namespace}${job_id}' - conn.del(job_key) or { - return error('Failed to delete job ${job_id} from Redis: ${err}') - } -} - -// List all job IDs from Redis -pub fn list_all_job_ids(mut conn redisclient.Redis, namespace string) ![]string { - pattern := '${namespace}*' - keys := conn.keys(pattern) or { - return error('Failed to list job keys: ${err}') - } - - mut job_ids := []string{} - for key in keys { - // Extract job ID from key (remove namespace prefix) - if key.starts_with(namespace) { - job_id := key[namespace.len..] - job_ids << job_id - } - } - - return job_ids -} \ No newline at end of file diff --git a/src/runner.v b/src/runner.v index b986e35..862b548 100644 --- a/src/runner.v +++ b/src/runner.v @@ -18,20 +18,12 @@ pub: name string namespace string pub mut: - redis_conn &redisclient.Redis + client &Client } // Constants for Redis operations const blpop_timeout_seconds = 5 -pub fn (r Runner) queue_key() ! string { - return if r.namespace.len > 0 { - "${r.namespace}:runner:${r.name}" - } else { - "runner:${r.name}" - } -} - // Spawn an actor with the trait-based interface // This provides the common actor loop implementation that all actors can use pub fn (mut r Runner) run() ! { @@ -49,7 +41,7 @@ pub fn (mut r Runner) run() ! { log.info('Channel closed') break } - job := load_from_redis(mut r.redis_conn, job_id, r.namespace) or { + job := r.client.load_from_redis(job_id) or { log.error('Actor "${r.name}" failed to load job ${job_id}: ${err}') continue } @@ -65,7 +57,7 @@ pub fn (mut r Runner) run() ! { pub fn (mut r Runner) listen_for_jobs(channel chan string) ! { for { queue_key := r.queue_key()! - result := r.redis_conn.blpop([queue_key], 5) or { + result := r.client.redis_conn.blpop([queue_key], 5) or { continue } // blpop returns [queue_key, job_id], we want the job_id (index 1) @@ -79,10 +71,10 @@ pub fn (mut r Runner) listen_for_jobs(channel chan string) ! { pub fn (mut r Runner) run_job(job Job) ! { log.info('MyCustomActor "${r.name}": Processing job ${job.id} with custom logic') - command := "hero -s ${job.payload}" + command := "hero run -s ${job.payload}" // Update job status to started - update_job_status(mut r.redis_conn, r.namespace, job.id, .started)! + r.client.update_job_status(job.id, .started)! if job.executor.starts_with('tmux') { session_id := job.executor.all_after_first('tmux') mut t := tmux.Tmux{ @@ -119,7 +111,11 @@ pub fn (mut r Runner) run_job(job Job) ! { } else { return error('Unknown executor: ${job.executor}') } - update_job_status(mut r.redis_conn, r.namespace, job.id, .finished)! - set_result(mut r.redis_conn, r.namespace, job.id, '')! + r.client.update_job_status(job.id, .finished)! + r.client.set_result(job.id, '')! log.info('MyCustomActor "${r.name}": Job ${job.id} completed successfully') +} + +pub fn (r Runner) queue_key() ! string { + return "${r.client.queues_key()!}:${r.name}" } \ No newline at end of file