update example and execution

This commit is contained in:
Timur Gordon
2025-08-26 17:16:01 +02:00
parent 010ecc7c71
commit 4958cd68c2
7 changed files with 171 additions and 240 deletions

View File

@@ -33,13 +33,15 @@ if help_requested {
mut r := &runner.Runner{
name: runner_name
namespace: namespace
redis_conn: redisclient.new(
if redis_url.len > 0 {
redis_url
} else {
'127.0.0.1:6379'
}
)!
client: &runner.Client{
redis_conn: redisclient.new(
if redis_url.len > 0 {
redis_url
} else {
'127.0.0.1:6379'
}
)!
}
}
spawn r.run()

Binary file not shown.

View File

@@ -1,34 +1,39 @@
#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run
import time
import freeflowuniverse.herolib.baobab.runner
import herocode.runner
import freeflowuniverse.herolib.core.redisclient
const namespace = ''
const namespace = 'example'
mut r := &runner.Runner{
name: 'test_runner'
redis_conn: redisclient.new('127.0.0.1:6379')!
name: 'example_runner'
namespace: namespace
client: &runner.Client{
redis_conn: redisclient.new('127.0.0.1:6379')!
}
}
spawn r.run()
// job := runner.Job{
// id: 'test_job_1'
// caller_id: 'test_caller'
// context_id: 'test_context_1'
// runner: 'test_runner'
// executor: 'tmux.session1'
// payload: 'sleep 10\necho "Hello from job 1"\nsleep 10'
// status: .dispatched
// timeout: 30
// created_at: time.now()
// updated_at: time.now()
// }
job := runner.Job{
id: 'test_job_1'
caller_id: 'test_caller'
context_id: 'test_context_1'
runner: 'test_runner'
executor: 'tmux.session1'
payload: '!!git.list'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
}
// mut redis_conn := redisclient.new('127.0.0.1:6379')!
// job.store_in_redis(mut redis_conn, namespace)!
// mut runner_q := redis_conn.queue_get(r.queue_key()!)
// runner_q.add(job.id)!
mut client := runner.Client{
redis_conn: redisclient.new('127.0.0.1:6379')!
}
client.store_in_redis(job.id, job)!
mut runner_q := client.redis_conn.queue_get(r.queue_key()!)
runner_q.add(job.id)!
for {}

View File

@@ -1,107 +0,0 @@
#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run
import freeflowuniverse.herolib.core.redisclient
import freeflowuniverse.herolib.baobab.runner
import time
import log
// Test script to send RPC calls over Redis to test the runner
fn main() {
log.info('Starting runner RPC test script')
// Connect to Redis
mut redis := redisclient.new('127.0.0.1:6379')!
// Test runner configuration
runner_name := 'test_runner'
namespace := ''
// Create test jobs
test_jobs := [
runner.Job{
id: 'test_job_1'
caller_id: 'test_caller'
context_id: 'test_context_1'
runner: runner_name
executor: 'tmux.session1'
payload: 'echo "Hello from job 1"'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
},
runner.Job{
id: 'test_job_2'
caller_id: 'test_caller'
context_id: 'test_context_2'
runner: runner_name
executor: 'tmux.session2.window1'
payload: 'ls -la && echo "Job 2 completed"'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
},
runner.Job{
id: 'test_job_3'
caller_id: 'test_caller'
context_id: 'test_context_3'
runner: runner_name
executor: 'tmux.session3.window1.pane1'
payload: 'date && echo "Current time from job 3"'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
}
]
log.info('Storing ${test_jobs.len} test jobs in Redis and dispatching to runner queue')
// Store jobs in Redis and dispatch them to the runner queue
for job in test_jobs {
// Store job data in Redis
job.store_in_redis(mut redis, namespace) or {
log.error('Failed to store job ${job.id}: ${err}')
continue
}
// Dispatch job to runner queue by pushing job ID to the queue
queue_key := if namespace.len > 0 {
"${namespace}:runner:${runner_name}"
} else {
"runner:${runner_name}"
}
redis.rpush(queue_key, job.id) or {
log.error('Failed to dispatch job ${job.id} to queue ${queue_key}: ${err}')
continue
}
log.info('Dispatched job ${job.id} to queue ${queue_key}')
// Small delay between jobs
time.sleep(1 * time.second)
}
log.info('All test jobs dispatched. Monitoring job status...')
// Monitor job status for a while
for i in 0..30 { // Monitor for 30 seconds
log.info('--- Status check ${i + 1} ---')
for job in test_jobs {
loaded_job := runner.load_from_redis(mut redis, job.id, namespace) or {
log.error('Failed to load job ${job.id}: ${err}')
continue
}
log.info('Job ${loaded_job.id}: status=${loaded_job.status}, result="${loaded_job.result}", error="${loaded_job.error}"')
}
time.sleep(1 * time.second)
}
log.info('Test completed. Check tmux sessions to see if commands were executed.')
log.info('You can run "tmux list-sessions" to see created sessions.')
}

126
src/client.v Normal file
View File

@@ -0,0 +1,126 @@
module runner
import rand
import time
import json
import freeflowuniverse.herolib.core.redisclient
pub struct Client {
namespace string
pub mut:
redis_conn &redisclient.Redis
}
fn (client Client) job_key(job_id string) ! string {
return if client.namespace.len > 0 {
'${client.namespace}:job:${job_id}'
} else {
'job:${job_id}'
}
}
fn (client Client) jobs_key() ! string {
return if client.namespace.len > 0 {
'${client.namespace}:job'
} else {
'job'
}
}
// Store this job in Redis
pub fn (mut client Client) store_in_redis(job_id string, job Job) ! {
job_key := client.job_key(job_id)!
// Store job as JSON
job_json := json.encode(job)
client.redis_conn.hset(job_key, 'data', job_json) or { return error('Failed to store job ${job.id}: ${err}') }
}
// Load a job from Redis by ID
pub fn (mut client Client) load_from_redis(job_id string) !Job {
job_key := client.job_key(job_id)!
// Get job JSON from Redis
job_json := client.redis_conn.hget(job_key, 'data') or {
return error('Failed to load job ${job_id} from Redis: ${err}')
}
if job_json.len == 0 {
return error('Job ${job_id} not found in Redis')
}
// Parse job from JSON
job := json.decode(Job, job_json) or {
return error('Failed to decode job ${job_id} JSON: ${err}')
}
return job
}
// Update job status in Redis
pub fn (mut client Client) update_job_status(job_id string, status JobStatus) ! {
// Load job, update status, and store back
mut job := client.load_from_redis(job_id)!
job.status = status
job.updated_at = time.now()
client.store_in_redis(job_id, job)!
}
// Get job status from Redis
pub fn (mut client Client) get_status(job_id string) !JobStatus {
job := client.load_from_redis(job_id)!
return job.status
}
// Set job result in Redis
pub fn (mut client Client) set_result(job_id string, result string) ! {
// Load job, update result, and store back
mut job := client.load_from_redis(job_id)!
job.result = result
job.updated_at = time.now()
client.store_in_redis(job_id, job)!
}
// Set job error in Redis
pub fn (mut client Client) set_error(job_id string, error_msg string) ! {
// Load job, update error, and store back
mut job := client.load_from_redis(job_id)!
job.error = error_msg
job.status = .error
job.updated_at = time.now()
client.store_in_redis(job_id, job)!
}
// Delete job from Redis
pub fn (mut client Client) delete_from_redis(job_id string) ! {
job_key := client.job_key(job_id)!
client.redis_conn.del(job_key) or {
return error('Failed to delete job ${job_id} from Redis: ${err}')
}
}
// List all job IDs from Redis
pub fn (mut client Client) list_all_job_ids() ![]string {
keys := client.redis_conn.keys(client.jobs_key()!) or {
return error('Failed to list job keys: ${err}')
}
mut job_ids := []string{}
for key in keys {
// Extract job ID from key (remove namespace prefix)
if key.starts_with(client.jobs_key()!) {
job_id := key[client.jobs_key()!.len..]
job_ids << job_id
}
}
return job_ids
}
pub fn (client Client) queues_key() ! string {
return if client.namespace.len > 0 {
"${client.namespace}:runner"
} else {
"runner"
}
}

View File

@@ -41,94 +41,3 @@ fn generate_job_id() string {
random_part := rand.int_in_range(1000, 9999) or { 1234 }
return 'job_${timestamp}_${random_part}'
}
// Store this job in Redis
pub fn (job Job) store_in_redis(mut conn redisclient.Redis, namespace string) ! {
job_key := '${namespace}${job.id}'
// Store job as JSON
job_json := json.encode(job)
conn.set(job_key, job_json) or { return error('Failed to store job ${job.id}: ${err}') }
}
// Load a job from Redis by ID
pub fn load_from_redis(mut conn redisclient.Redis, job_id string, namespace string) !Job {
job_key := '${namespace}${job_id}'
// Get job JSON from Redis
job_json := conn.get(job_key) or {
return error('Failed to load job ${job_id} from Redis: ${err}')
}
if job_json.len == 0 {
return error('Job ${job_id} not found in Redis')
}
// Parse job from JSON
job := json.decode(Job, job_json) or {
return error('Failed to decode job ${job_id} JSON: ${err}')
}
return job
}
// Update job status in Redis
pub fn update_job_status(mut conn redisclient.Redis, namespace string, job_id string, status JobStatus) ! {
// Load job, update status, and store back
mut job := load_from_redis(mut conn, job_id, namespace)!
job.status = status
job.updated_at = time.now()
job.store_in_redis(mut conn, namespace)!
}
// Get job status from Redis
pub fn get_status(mut conn redisclient.Redis, job_id string, namespace string) !JobStatus {
job := load_from_redis(mut conn, job_id, namespace)!
return job.status
}
// Set job result in Redis
pub fn set_result(mut conn redisclient.Redis, namespace string, job_id string, result string) ! {
// Load job, update result, and store back
mut job := load_from_redis(mut conn, job_id, namespace)!
job.result = result
job.updated_at = time.now()
job.store_in_redis(mut conn, namespace)!
}
// Set job error in Redis
pub fn set_error(mut conn redisclient.Redis, namespace string, job_id string, error_msg string) ! {
// Load job, update error, and store back
mut job := load_from_redis(mut conn, job_id, namespace)!
job.error = error_msg
job.status = .error
job.updated_at = time.now()
job.store_in_redis(mut conn, namespace)!
}
// Delete job from Redis
pub fn delete_from_redis(mut conn redisclient.Redis, job_id string, namespace string) ! {
job_key := '${namespace}${job_id}'
conn.del(job_key) or {
return error('Failed to delete job ${job_id} from Redis: ${err}')
}
}
// List all job IDs from Redis
pub fn list_all_job_ids(mut conn redisclient.Redis, namespace string) ![]string {
pattern := '${namespace}*'
keys := conn.keys(pattern) or {
return error('Failed to list job keys: ${err}')
}
mut job_ids := []string{}
for key in keys {
// Extract job ID from key (remove namespace prefix)
if key.starts_with(namespace) {
job_id := key[namespace.len..]
job_ids << job_id
}
}
return job_ids
}

View File

@@ -18,20 +18,12 @@ pub:
name string
namespace string
pub mut:
redis_conn &redisclient.Redis
client &Client
}
// Constants for Redis operations
const blpop_timeout_seconds = 5
pub fn (r Runner) queue_key() ! string {
return if r.namespace.len > 0 {
"${r.namespace}:runner:${r.name}"
} else {
"runner:${r.name}"
}
}
// Spawn an actor with the trait-based interface
// This provides the common actor loop implementation that all actors can use
pub fn (mut r Runner) run() ! {
@@ -49,7 +41,7 @@ pub fn (mut r Runner) run() ! {
log.info('Channel closed')
break
}
job := load_from_redis(mut r.redis_conn, job_id, r.namespace) or {
job := r.client.load_from_redis(job_id) or {
log.error('Actor "${r.name}" failed to load job ${job_id}: ${err}')
continue
}
@@ -65,7 +57,7 @@ pub fn (mut r Runner) run() ! {
pub fn (mut r Runner) listen_for_jobs(channel chan string) ! {
for {
queue_key := r.queue_key()!
result := r.redis_conn.blpop([queue_key], 5) or {
result := r.client.redis_conn.blpop([queue_key], 5) or {
continue
}
// blpop returns [queue_key, job_id], we want the job_id (index 1)
@@ -79,10 +71,10 @@ pub fn (mut r Runner) listen_for_jobs(channel chan string) ! {
pub fn (mut r Runner) run_job(job Job) ! {
log.info('MyCustomActor "${r.name}": Processing job ${job.id} with custom logic')
command := "hero -s ${job.payload}"
command := "hero run -s ${job.payload}"
// Update job status to started
update_job_status(mut r.redis_conn, r.namespace, job.id, .started)!
r.client.update_job_status(job.id, .started)!
if job.executor.starts_with('tmux') {
session_id := job.executor.all_after_first('tmux')
mut t := tmux.Tmux{
@@ -119,7 +111,11 @@ pub fn (mut r Runner) run_job(job Job) ! {
} else {
return error('Unknown executor: ${job.executor}')
}
update_job_status(mut r.redis_conn, r.namespace, job.id, .finished)!
set_result(mut r.redis_conn, r.namespace, job.id, '')!
r.client.update_job_status(job.id, .finished)!
r.client.set_result(job.id, '')!
log.info('MyCustomActor "${r.name}": Job ${job.id} completed successfully')
}
pub fn (r Runner) queue_key() ! string {
return "${r.client.queues_key()!}:${r.name}"
}