commit 010ecc7c71b268eb8783dcc1d9dc5792cf4ac887 Author: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Tue Aug 26 14:47:52 2025 +0200 initial commit diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..01072ca --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.v] +indent_style = tab diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..9a98968 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,8 @@ +* text=auto eol=lf +*.bat eol=crlf + +*.v linguist-language=V +*.vv linguist-language=V +*.vsh linguist-language=V +v.mod linguist-language=V +.vdocignore linguist-language=ignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..264e08c --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Binaries for programs and plugins +main +runner_v +*.exe +*.exe~ +*.so +*.dylib +*.dll + +# Ignore binary output folders +bin/ + +# Ignore common editor/system specific metadata +.DS_Store +.idea/ +.vscode/ +*.iml + +# ENV +.env + +# vweb and database +*.db +*.js + +cmd/runner \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f34fe9e --- /dev/null +++ b/README.md @@ -0,0 +1,280 @@ +# Hero Runner V + +A V language implementation of the Hero Baobab runner system for distributed job execution. This runner provides a Redis-based job queue system with support for various executors including tmux sessions. + +## Overview + +The Hero Runner V is a lightweight, high-performance job execution system built in V lang. It connects to Redis for job queuing and supports multiple execution environments through configurable executors. + +## Features + +- **Redis-based Job Queue**: Reliable job queuing and status tracking +- **Multiple Executors**: Support for tmux sessions, windows, and panes +- **Job Lifecycle Management**: Complete job status tracking (dispatched → started → finished/error) +- **Configurable Timeouts**: Per-job timeout configuration +- **Environment Variables**: Job-specific environment variable support +- **Namespace Support**: Multi-tenant runner organization +- **CLI Interface**: Command-line interface with flag parsing + +## Architecture + +```text +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Redis Queue │ │ Runner │ │ Executor │ +│ │ │ │ │ │ +│ Job Storage │◄──►│ Job Processor │◄──►│ tmux.session │ +│ Status Updates │ │ Status Manager │ │ tmux.window │ +│ │ │ │ │ tmux.pane │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ +``` + +## Installation + +### Prerequisites + +- V language compiler +- Redis server +- Herolib dependencies + +### Quick Install + +```bash +# Run the installation script +./scripts/install.sh +``` + +This will: +1. Install V language and Herolib +2. Set up module dependencies +3. Link the runner module to vmodules + +### Manual Installation + +```bash +# Install V & Herolib +curl 'https://raw.githubusercontent.com/freeflowuniverse/herolib/refs/heads/development/install_v.sh' > /tmp/install_v.sh +bash /tmp/install_v.sh --analyzer --herolib + +# Install herolib +cd ${HOME}/code/github/freeflowuniverse/herolib +bash install_herolib.vsh + +# Link runner module +mkdir -p "${HOME}/.vmodules/herocode" +ln -s "/path/to/runner_v/src" "${HOME}/.vmodules/herocode/runner" +``` + +## Usage + +### Starting the Runner + +```bash +# Start with default settings +./scripts/run.sh + +# Or run directly with custom options +./cmd/runner.vsh --name my_runner --namespace production --redis_url redis://localhost:6379 +``` + +### Command Line Options + +```bash +Usage: runner [flags] + +Flags: + -n --name name of the runner (default: test_runner) + -s --namespace namespace of the runner (default: '') + -r --redis_url redis url (default: 127.0.0.1:6379) + -h --help Show help message +``` + +### Creating and Dispatching Jobs + +```v +import herocode.runner +import freeflowuniverse.herolib.core.redisclient +import time + +// Create a job +job := runner.new_job(runner.Job{ + caller_id: 'my_app' + context_id: 'task_123' + runner: 'my_runner' + executor: 'tmux.session1.window1' + payload: 'echo "Hello World" && sleep 5' + timeout: 30 +})! + +// Connect to Redis and store the job +mut redis_conn := redisclient.new('127.0.0.1:6379')! +job.store_in_redis(mut redis_conn, 'production:')! + +// Add job to runner queue +runner_queue_key := 'production:runner:my_runner' +mut queue := redis_conn.queue_get(runner_queue_key) +queue.add(job.id)! +``` + +## Job Structure + +Jobs contain the following fields: + +```v +pub struct Job { +pub mut: + id string // Unique job identifier + caller_id string // ID of the calling service + context_id string // Context/session identifier + runner string // Target runner name + executor string // Execution environment (e.g., tmux.session1) + payload string // Script/command to execute + status JobStatus // Current job status + result string // Job execution result + error string // Error message if failed + timeout int // Timeout in seconds + env_vars map[string]string // Environment variables + created_at time.Time // Job creation timestamp + updated_at time.Time // Last update timestamp +} +``` + +### Job Status Lifecycle + +```text +dispatched → started → finished + ↓ ↓ ↑ + └─────── error ──────┘ +``` + +- **dispatched**: Job is queued and waiting for processing +- **started**: Job execution has begun +- **finished**: Job completed successfully +- **error**: Job failed during execution + +## Executor Types + +The runner supports various executor types for different execution environments: + +### Tmux Executors + +- `tmux.session_name` - Execute in a tmux session +- `tmux.session_name.window_name` - Execute in a specific window +- `tmux.session_name.window_name.pane_id` - Execute in a specific pane + +Example: +```v +job := runner.Job{ + // ... other fields + executor: 'tmux.dev_session.main_window.pane1' + payload: 'npm run build && npm test' +} +``` + +## Examples + +### Basic Runner + +```v +#!/usr/bin/env -S v run + +import herocode.runner +import freeflowuniverse.herolib.core.redisclient + +mut r := &runner.Runner{ + name: 'basic_runner' + namespace: 'development' + redis_conn: redisclient.new('127.0.0.1:6379')! +} + +// Start the runner +spawn r.run() + +// Keep running +for {} +``` + +### Job Submission Script + +See `examples/test_runner_rpc.vsh` for a complete example of submitting multiple jobs to a runner. + +## Testing + +Run the test suite: + +```bash +# Run all tests +v test src/ + +# Run specific test files +v test src/runner_test.v +v test src/job_test.v +``` + +## Configuration + +### Environment Variables + +- `REDIS_URL`: Redis connection URL (default: 127.0.0.1:6379) +- `RUNNER_NAME`: Default runner name +- `RUNNER_NAMESPACE`: Default namespace + +### Redis Keys + +The runner uses the following Redis key patterns: + +- Jobs: `{namespace}job_{id}` +- Runner Queues: `{namespace}runner:{runner_name}` +- Status Updates: Stored within job objects + +## Development + +### Project Structure + +``` +runner_v/ +├── cmd/ # Command-line applications +│ └── runner.vsh # Main runner executable +├── src/ # Source code +│ ├── runner.v # Core runner implementation +│ ├── job.v # Job structure and operations +│ ├── factory.v # Job creation utilities +│ └── *_test.v # Test files +├── examples/ # Usage examples +├── scripts/ # Build and deployment scripts +└── README.md # This file +``` + +### Building + +```bash +# Build the runner +v -prod cmd/runner.vsh -o runner + +# Run in development mode +v run cmd/runner.vsh +``` + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests for new functionality +5. Run the test suite +6. Submit a pull request + +## License + +MIT License - see LICENSE file for details. + +## Related Projects + +- [Herolib](https://github.com/freeflowuniverse/herolib) - Core utilities and libraries +- [Hero Baobab](https://github.com/freeflowuniverse/baobab) - Distributed actor system + +## Support + +For issues and questions: +- Open an issue on GitHub +- Check the examples directory for usage patterns +- Review the test files for implementation details \ No newline at end of file diff --git a/cmd/runner.vsh b/cmd/runner.vsh new file mode 100755 index 0000000..12ec76a --- /dev/null +++ b/cmd/runner.vsh @@ -0,0 +1,47 @@ +#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run + +import os +import time +import flag +import herocode.runner +import freeflowuniverse.herolib.core.redisclient + +mut fp := flag.new_flag_parser(os.args) +fp.application('runner') +fp.version('v0.1.0') +fp.description('Runner in Vlang') +fp.skip_executable() + +// Define the flags +runner_name := fp.string('name', `n`, 'test_runner', 'name of the runner') +namespace := fp.string('namespace', `s`, '', 'namespace of the runner') +redis_url := fp.string('redis_url', `r`, '127.0.0.1:6379', 'redis url') +help_requested := fp.bool('help', `h`, false, 'Show help message') + +// Parse the arguments +remaining_args := fp.finalize() or { + eprintln('Error parsing arguments: ${err}') + println(fp.usage()) + exit(1) +} + +if help_requested { + println(fp.usage()) + exit(0) +} + +mut r := &runner.Runner{ + name: runner_name + namespace: namespace + redis_conn: redisclient.new( + if redis_url.len > 0 { + redis_url + } else { + '127.0.0.1:6379' + } + )! +} + +spawn r.run() + +for {} \ No newline at end of file diff --git a/examples/runner b/examples/runner new file mode 100755 index 0000000..a66ce72 Binary files /dev/null and b/examples/runner differ diff --git a/examples/runner.vsh b/examples/runner.vsh new file mode 100755 index 0000000..12c7f0f --- /dev/null +++ b/examples/runner.vsh @@ -0,0 +1,34 @@ +#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run + +import time +import freeflowuniverse.herolib.baobab.runner +import freeflowuniverse.herolib.core.redisclient + +const namespace = '' + +mut r := &runner.Runner{ + name: 'test_runner' + redis_conn: redisclient.new('127.0.0.1:6379')! +} + +spawn r.run() + +// job := runner.Job{ +// id: 'test_job_1' +// caller_id: 'test_caller' +// context_id: 'test_context_1' +// runner: 'test_runner' +// executor: 'tmux.session1' +// payload: 'sleep 10\necho "Hello from job 1"\nsleep 10' +// status: .dispatched +// timeout: 30 +// created_at: time.now() +// updated_at: time.now() +// } + +// mut redis_conn := redisclient.new('127.0.0.1:6379')! +// job.store_in_redis(mut redis_conn, namespace)! +// mut runner_q := redis_conn.queue_get(r.queue_key()!) +// runner_q.add(job.id)! + +for {} \ No newline at end of file diff --git a/examples/test_runner_rpc.vsh b/examples/test_runner_rpc.vsh new file mode 100644 index 0000000..5b9873d --- /dev/null +++ b/examples/test_runner_rpc.vsh @@ -0,0 +1,107 @@ +#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run + +import freeflowuniverse.herolib.core.redisclient +import freeflowuniverse.herolib.baobab.runner +import time +import log + +// Test script to send RPC calls over Redis to test the runner +fn main() { + log.info('Starting runner RPC test script') + + // Connect to Redis + mut redis := redisclient.new('127.0.0.1:6379')! + + // Test runner configuration + runner_name := 'test_runner' + namespace := '' + + // Create test jobs + test_jobs := [ + runner.Job{ + id: 'test_job_1' + caller_id: 'test_caller' + context_id: 'test_context_1' + runner: runner_name + executor: 'tmux.session1' + payload: 'echo "Hello from job 1"' + status: .dispatched + timeout: 30 + created_at: time.now() + updated_at: time.now() + }, + runner.Job{ + id: 'test_job_2' + caller_id: 'test_caller' + context_id: 'test_context_2' + runner: runner_name + executor: 'tmux.session2.window1' + payload: 'ls -la && echo "Job 2 completed"' + status: .dispatched + timeout: 30 + created_at: time.now() + updated_at: time.now() + }, + runner.Job{ + id: 'test_job_3' + caller_id: 'test_caller' + context_id: 'test_context_3' + runner: runner_name + executor: 'tmux.session3.window1.pane1' + payload: 'date && echo "Current time from job 3"' + status: .dispatched + timeout: 30 + created_at: time.now() + updated_at: time.now() + } + ] + + log.info('Storing ${test_jobs.len} test jobs in Redis and dispatching to runner queue') + + // Store jobs in Redis and dispatch them to the runner queue + for job in test_jobs { + // Store job data in Redis + job.store_in_redis(mut redis, namespace) or { + log.error('Failed to store job ${job.id}: ${err}') + continue + } + + // Dispatch job to runner queue by pushing job ID to the queue + queue_key := if namespace.len > 0 { + "${namespace}:runner:${runner_name}" + } else { + "runner:${runner_name}" + } + + redis.rpush(queue_key, job.id) or { + log.error('Failed to dispatch job ${job.id} to queue ${queue_key}: ${err}') + continue + } + + log.info('Dispatched job ${job.id} to queue ${queue_key}') + + // Small delay between jobs + time.sleep(1 * time.second) + } + + log.info('All test jobs dispatched. Monitoring job status...') + + // Monitor job status for a while + for i in 0..30 { // Monitor for 30 seconds + log.info('--- Status check ${i + 1} ---') + + for job in test_jobs { + loaded_job := runner.load_from_redis(mut redis, job.id, namespace) or { + log.error('Failed to load job ${job.id}: ${err}') + continue + } + + log.info('Job ${loaded_job.id}: status=${loaded_job.status}, result="${loaded_job.result}", error="${loaded_job.error}"') + } + + time.sleep(1 * time.second) + } + + log.info('Test completed. Check tmux sessions to see if commands were executed.') + log.info('You can run "tmux list-sessions" to see created sessions.') +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..d748c3a --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# Install V & Herolib +curl 'https://raw.githubusercontent.com/freeflowuniverse/herolib/refs/heads/development/install_v.sh' > /tmp/install_v.sh +bash /tmp/install_v.sh --herolib + +pushd ${HOME}/code/github/freeflowuniverse/herolib +./install_herolib.vsh +popd + +# Install runner +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +MODULE_DIR="${SCRIPT_DIR}/../" + +# Link module to vmodules +mkdir -p "${HOME}/.vmodules/herocode" +unlink ${HOME}/.vmodules/herocode/runner +ln -s ${MODULE_DIR}/src ${HOME}/.vmodules/herocode/runner \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 0000000..7e57a5d --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +MODULE_DIR="${SCRIPT_DIR}/../" + +${MODULE_DIR}/cmd/runner.vsh \ No newline at end of file diff --git a/scripts/test.sh b/scripts/test.sh new file mode 100755 index 0000000..05b6434 --- /dev/null +++ b/scripts/test.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# serve.sh - Build optimized WASM and serve with Caddy + Brotli compression +set -e + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +SOURCE_DIR="${SCRIPT_DIR}/../src" + +pushd ${SOURCE_DIR} +vtest +popd \ No newline at end of file diff --git a/src/README.md b/src/README.md new file mode 100644 index 0000000..d8fbede --- /dev/null +++ b/src/README.md @@ -0,0 +1,269 @@ +# V Lang Actor Interface + +This module provides a V lang port of the Rust actor trait interface for the Hero Baobab system. It enables implementing actors in V lang with a simple interface: implement `process_job` and use `spawn` to run the actor. + +## Architecture + +The V lang actor interface mirrors the Rust implementation with these key components: + +```text +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ SyncActor │ │ AsyncActor │ │ MyCustomActor │ +│ │ │ │ │ │ +│ process_job() │ │ process_job() │ │ process_job() │ +│ (sequential) │ │ (concurrent) │ │ (custom logic) │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ + │ │ │ + └───────────────┬───────────────────────────────┘ + │ + ┌───────▼───────┐ + │ Actor Interface│ + │ │ + │ spawn_actor() │ + │ process_job() │ + │ config │ + └───────────────┘ +``` + +## Core Components + +### Actor Interface + +All actors must implement the `Actor` interface: + +```v +pub interface Actor { + // Process a single job - must be implemented by concrete actors + process_job(job Job, mut redis_conn redisclient.Redis) ! + + // Get the actor type identifier + actor_type() string + + // Get the actor ID + actor_id() string + + // Get the Redis URL + redis_url() string + + // Get the database path + db_path() string + + // Check if tasks should be preserved + preserve_tasks() bool +} +``` + +### ActorConfig + +Configuration structure for all actors: + +```v +pub struct ActorConfig { +pub: + actor_id string + db_path string + redis_url string + preserve_tasks bool + default_timeout ?time.Duration // Optional timeout for async actors +} +``` + +### Job Structure + +Jobs processed by actors: + +```v +pub struct Job { +pub mut: + id string + caller_id string + context_id string + script string + status JobStatus + result string + error string + created_at time.Time + started_at time.Time + finished_at time.Time +} +``` + +## Built-in Actor Implementations + +### SyncActor + +Processes jobs sequentially, one at a time: + +```v +import freeflowuniverse.herolib.lib.baobab.actor + +// Create configuration +config := actor.new_actor_config( + 'sync_actor_1', + '/path/to/database', + 'redis://localhost:6379', + false +) + +// Create and spawn sync actor +sync_actor := actor.new_sync_actor(config)! +mut shutdown_chan := chan bool{cap: 1} +go actor.spawn_actor(sync_actor, mut shutdown_chan) + +// Later, shutdown the actor +shutdown_chan <- true +``` + +### AsyncActor + +Processes jobs concurrently with timeout support: + +```v +import time +import freeflowuniverse.herolib.lib.baobab.actor + +// Create configuration with timeout +mut config := actor.new_actor_config( + 'async_actor_1', + '/path/to/database', + 'redis://localhost:6379', + false +) +config = config.with_default_timeout(time.Duration(300 * time.second)) + +// Create and spawn async actor +async_actor := actor.new_async_actor(config)! +mut shutdown_chan := chan bool{cap: 1} +go actor.spawn_actor(async_actor, mut shutdown_chan) + +// Later, shutdown the actor +shutdown_chan <- true +``` + +## Creating Custom Actors + +To implement a custom actor, simply implement the `Actor` interface: + +```v +import freeflowuniverse.herolib.lib.baobab.actor + +struct MyCustomActor { +pub: + config actor.ActorConfig +mut: + engine rhai.Engine + // Add your custom fields here +} + +pub fn new_custom_actor(config actor.ActorConfig) !MyCustomActor { + mut engine := rhai.new_engine()! + + return MyCustomActor{ + config: config + engine: engine + } +} + +// Implement the Actor interface +pub fn (mut actor MyCustomActor) process_job(job actor.Job, mut redis_conn redisclient.Redis) ! { + // Your custom job processing logic here + + // Update job status to started + actor.update_job_status(mut redis_conn, job.id, .started)! + + // Execute the script (or your custom logic) + result := actor.execute_job_with_engine(mut actor.engine, job, actor.config.db_path)! + + // Update job status to finished and set result + actor.update_job_status(mut redis_conn, job.id, .finished)! + actor.set_job_result(mut redis_conn, job.id, result)! + + // Clean up if needed + actor.cleanup_job(mut redis_conn, job.id, job.context_id, actor.config.preserve_tasks)! +} + +// Implement required interface methods +pub fn (actor MyCustomActor) actor_type() string { return 'MyCustomActor' } +pub fn (actor MyCustomActor) actor_id() string { return actor.config.actor_id } +pub fn (actor MyCustomActor) redis_url() string { return actor.config.redis_url } +pub fn (actor MyCustomActor) db_path() string { return actor.config.db_path } +pub fn (actor MyCustomActor) preserve_tasks() bool { return actor.config.preserve_tasks } + +// Usage +config := actor.new_actor_config('my_actor', '/db/path', 'redis://localhost:6379', false) +custom_actor := new_custom_actor(config)! +mut shutdown_chan := chan bool{cap: 1} +go actor.spawn_actor(custom_actor, mut shutdown_chan) +``` + +## Key Features + +### Unified Interface +- Same `spawn_actor()` function works with all actor implementations +- Consistent configuration and lifecycle management +- Clean separation between actor logic and infrastructure + +### Redis Integration +- Automatic Redis connection management +- Job polling from actor-specific queues (`hero:job:actor_queue:{actor_id}`) +- Job status tracking and result storage +- Optional job cleanup based on `preserve_tasks` setting + +### Script Execution +- Rhai script engine integration +- Automatic job context setup (DB_PATH, CALLER_ID, CONTEXT_ID) +- Error handling and status updates + +### Graceful Shutdown +- Channel-based shutdown signaling +- Proper cleanup of resources +- Support for cancelling running jobs (AsyncActor) + +## Job Processing Flow + +1. **Job Polling**: Actor polls Redis queue using BLPOP +2. **Job Loading**: Load job details from Redis hash +3. **Status Update**: Mark job as "started" +4. **Script Execution**: Execute Rhai script with job context +5. **Result Storage**: Store result or error in Redis +6. **Status Update**: Mark job as "finished" or "error" +7. **Cleanup**: Optionally remove job from Redis + +## Error Handling + +All operations include comprehensive error handling: +- Redis connection failures +- Job loading errors +- Script execution errors +- Timeout handling (AsyncActor) +- Graceful degradation and logging + +## Migration from Rust + +This V lang implementation provides the same functionality as the Rust version: +- ✅ Actor trait abstraction +- ✅ Sync and async actor implementations +- ✅ Unified spawn interface +- ✅ Redis job queue integration +- ✅ Rhai script execution +- ✅ Job lifecycle management +- ✅ Error handling and logging +- ✅ Graceful shutdown + +## Dependencies + +The actor interface depends on these herolib modules: +- `freeflowuniverse.herolib.clients.redisclient` - Redis operations +- `freeflowuniverse.herolib.data.rhai` - Script execution +- `freeflowuniverse.herolib.core.base` - Base utilities + +## Usage Summary + +To implement an actor in V lang: + +1. **Create ActorConfig**: Configure actor ID, database path, Redis URL, etc. +2. **Implement Actor Interface**: Create a struct that implements the `Actor` interface +3. **Implement process_job()**: Add your job processing logic +4. **Use spawn()**: Call `spawn_actor()` to start the actor loop + +That's it! The interface handles all the Redis polling, job management, and infrastructure concerns automatically. diff --git a/src/factory.v b/src/factory.v new file mode 100644 index 0000000..ba4ba62 --- /dev/null +++ b/src/factory.v @@ -0,0 +1,27 @@ +module runner + +import freeflowuniverse.herolib.core.redisclient +import time + +// Build the job from the current builder configuration +pub fn new_job(job Job) !Job { + // Validate required fields + if job.caller_id.len == 0 { + return error('caller_id is required') + } + + if job.context_id.len == 0 { + return error('context_id is required') + } + + if job.payload.len == 0 { + return error('payload is required') + } + + return Job{ + ...job + id: if job.id.len == 0 { generate_job_id() } else { job.id } + created_at: time.now() + timeout: if job.timeout == 0 { 300 } else { job.timeout } + } +} \ No newline at end of file diff --git a/src/job.v b/src/job.v new file mode 100644 index 0000000..f79115c --- /dev/null +++ b/src/job.v @@ -0,0 +1,134 @@ +module runner + +import rand +import time +import json +import freeflowuniverse.herolib.core.redisclient + +// Job status enumeration +pub enum JobStatus { + dispatched + started + finished + error +} + +// Representation of a script execution request +// This structure contains all the information needed to execute a script +// on a actor service, including the script content, dependencies, and metadata +pub struct Job { +pub mut: + id string + caller_id string + context_id string + runner string + executor string + payload string + status JobStatus + result string + error string + timeout int + env_vars map[string]string + created_at time.Time + updated_at time.Time +} + +// Generate a unique job ID +fn generate_job_id() string { + // Simple UUID-like ID generation + now := time.now() + timestamp := now.unix() + random_part := rand.int_in_range(1000, 9999) or { 1234 } + return 'job_${timestamp}_${random_part}' +} + +// Store this job in Redis +pub fn (job Job) store_in_redis(mut conn redisclient.Redis, namespace string) ! { + job_key := '${namespace}${job.id}' + + // Store job as JSON + job_json := json.encode(job) + conn.set(job_key, job_json) or { return error('Failed to store job ${job.id}: ${err}') } +} + +// Load a job from Redis by ID +pub fn load_from_redis(mut conn redisclient.Redis, job_id string, namespace string) !Job { + job_key := '${namespace}${job_id}' + + // Get job JSON from Redis + job_json := conn.get(job_key) or { + return error('Failed to load job ${job_id} from Redis: ${err}') + } + + if job_json.len == 0 { + return error('Job ${job_id} not found in Redis') + } + + // Parse job from JSON + job := json.decode(Job, job_json) or { + return error('Failed to decode job ${job_id} JSON: ${err}') + } + + return job +} + +// Update job status in Redis +pub fn update_job_status(mut conn redisclient.Redis, namespace string, job_id string, status JobStatus) ! { + // Load job, update status, and store back + mut job := load_from_redis(mut conn, job_id, namespace)! + job.status = status + job.updated_at = time.now() + job.store_in_redis(mut conn, namespace)! +} + +// Get job status from Redis +pub fn get_status(mut conn redisclient.Redis, job_id string, namespace string) !JobStatus { + job := load_from_redis(mut conn, job_id, namespace)! + return job.status +} + +// Set job result in Redis +pub fn set_result(mut conn redisclient.Redis, namespace string, job_id string, result string) ! { + // Load job, update result, and store back + mut job := load_from_redis(mut conn, job_id, namespace)! + job.result = result + job.updated_at = time.now() + job.store_in_redis(mut conn, namespace)! +} + +// Set job error in Redis +pub fn set_error(mut conn redisclient.Redis, namespace string, job_id string, error_msg string) ! { + // Load job, update error, and store back + mut job := load_from_redis(mut conn, job_id, namespace)! + job.error = error_msg + job.status = .error + job.updated_at = time.now() + job.store_in_redis(mut conn, namespace)! +} + +// Delete job from Redis +pub fn delete_from_redis(mut conn redisclient.Redis, job_id string, namespace string) ! { + job_key := '${namespace}${job_id}' + conn.del(job_key) or { + return error('Failed to delete job ${job_id} from Redis: ${err}') + } +} + +// List all job IDs from Redis +pub fn list_all_job_ids(mut conn redisclient.Redis, namespace string) ![]string { + pattern := '${namespace}*' + keys := conn.keys(pattern) or { + return error('Failed to list job keys: ${err}') + } + + mut job_ids := []string{} + for key in keys { + // Extract job ID from key (remove namespace prefix) + if key.starts_with(namespace) { + job_id := key[namespace.len..] + job_ids << job_id + } + } + + return job_ids +} \ No newline at end of file diff --git a/src/job_test.v b/src/job_test.v new file mode 100644 index 0000000..47f0824 --- /dev/null +++ b/src/job_test.v @@ -0,0 +1,194 @@ +module runner + +import time + +fn test_script_type_conversion() { + // Test ScriptType to string conversion + assert ScriptType.osis.str() == 'osis' + assert ScriptType.sal.str() == 'sal' + assert ScriptType.v.str() == 'v' + assert ScriptType.python.str() == 'python' + +} + +fn test_job_status_conversion() { + assert JobStatus.dispatched.str() == 'dispatched' + assert JobStatus.started.str() == 'started' + assert JobStatus.finished.str() == 'finished' + assert JobStatus.error.str() == 'error' +} + +fn test_script_type_from_string() { + assert 'osis' == ScriptType.osis.str() + assert 'sal' == ScriptType.sal.str() + assert 'v' == ScriptType.v.str() + assert 'python' == ScriptType.python.str() + + // Test invalid string + if result := ScriptType.from_string('invalid') { + panic('Should fail for invalid script type') + } +} + +fn test_job_status_from_string() { + assert 'dispatched' == JobStatus.dispatched.str() + assert 'started' == JobStatus.started.str() + assert 'finished' == JobStatus.finished.str() + assert 'error' == JobStatus.error.str() + + // Test invalid string + if result := JobStatus.from_string('invalid') { + panic('Should fail for invalid job status') + } +} + +fn test_script_type_actor_queue_suffix() { + assert ScriptType.osis.actor_queue_suffix() == 'osis' + assert ScriptType.sal.actor_queue_suffix() == 'sal' + assert ScriptType.v.actor_queue_suffix() == 'v' + assert ScriptType.python.actor_queue_suffix() == 'python' +} + +fn test_new_job_creation()! { + // Test basic job creation using the old function + job := new_job( + caller_id: 'test_caller' + context_id: 'test_context' + script: 'print("hello")' + script_type: ScriptType.osis + )! + + assert job.caller_id == 'test_caller' + assert job.context_id == 'test_context' + assert job.script == 'print("hello")' + assert job.script_type == ScriptType.osis + assert job.status == JobStatus.dispatched + assert job.timeout == 300 // default timeout + assert job.retries == 0 + assert job.concurrent == false + assert job.id.len > 0 // should have generated an ID +} + +fn test_job_factory() { + // Test job creation using factory with custom fields + mut job_template := Job{ + caller_id: 'test_caller' + context_id: 'test_context' + script: 'print("test")' + script_type: .v + timeout: 60 + retries: 3 + concurrent: true + env_vars: {'TEST_VAR': 'test_value'} + prerequisites: ['job_1'] + dependents: ['job_2'] + } + + job := new_job(job_template) or { panic('Failed to create job: ${err}') } + + assert job.caller_id == 'test_caller' + assert job.context_id == 'test_context' + assert job.script == 'print("test")' + assert job.script_type == ScriptType.v + assert job.timeout == 60 + assert job.retries == 3 + assert job.concurrent == true + assert job.env_vars['TEST_VAR'] == 'test_value' + assert job.prerequisites.len == 1 + assert job.prerequisites[0] == 'job_1' + assert job.dependents.len == 1 + assert job.dependents[0] == 'job_2' + assert job.id.len > 0 // should have generated an ID + assert job.created_at != time.Time{} // should have set creation time +} + +fn test_job_factory_validation() { + // Test missing caller_id + mut invalid_job := Job{ + context_id: 'test_context' + script: 'test_script' + script_type: .v + } + + if result := new_job(invalid_job) { + panic('Should fail without caller_id') + } + + // Test missing context_id + invalid_job = Job{ + caller_id: 'test_caller' + script: 'test_script' + script_type: .v + } + + if result := new_job(invalid_job) { + panic('Should fail without context_id') + } + + // Test missing script + invalid_job = Job{ + caller_id: 'test_caller' + context_id: 'test_context' + script_type: .v + } + + if result := new_job(invalid_job) { + panic('Should fail without script') + } +} + +fn test_job_factory_with_env_vars() { + mut job_template := Job{ + caller_id: 'test_caller' + context_id: 'test_context' + script: 'test_script' + script_type: .v + env_vars: { + 'KEY1': 'value1' + 'KEY2': 'value2' + 'KEY3': 'value3' + } + } + + job := new_job(job_template) or { panic('Failed to create job with env vars: ${err}') } + + assert job.env_vars.len == 3 + assert job.env_vars['KEY1'] == 'value1' + assert job.env_vars['KEY2'] == 'value2' + assert job.env_vars['KEY3'] == 'value3' +} + +fn test_job_factory_with_dependencies() { + mut job_template := Job{ + caller_id: 'test_caller' + context_id: 'test_context' + script: 'test_script' + script_type: .v + prerequisites: ['job1', 'job2'] + dependents: ['job3', 'job4'] + } + + job := new_job(job_template) or { panic('Failed to create job with dependencies: ${err}') } + + assert job.prerequisites.len == 2 + assert job.prerequisites.contains('job1') + assert job.prerequisites.contains('job2') + assert job.dependents.len == 2 + assert job.dependents.contains('job3') + assert job.dependents.contains('job4') +} + +fn test_job_factory_with_custom_id() { + mut job_template := Job{ + id: 'custom_job_id' + caller_id: 'test_caller' + context_id: 'test_context' + script: 'test_script' + script_type: .v + } + + job := new_job(job_template) or { panic('Failed to create job with custom ID: ${err}') } + + assert job.id == 'custom_job_id' + assert job.created_at != time.Time{} // should still set creation time +} \ No newline at end of file diff --git a/src/runner.v b/src/runner.v new file mode 100644 index 0000000..b986e35 --- /dev/null +++ b/src/runner.v @@ -0,0 +1,125 @@ +module runner + +import log +import freeflowuniverse.herolib.core.redisclient +import freeflowuniverse.herolib.osal.tmux + +// ActorConfig holds configuration for actor instances +pub struct RunnerConfig { +pub: + name string + namespace string + redis_url string +} + +// Runner interface defines the common behavior for all actor implementations +pub struct Runner { +pub: + name string + namespace string +pub mut: + redis_conn &redisclient.Redis +} + +// Constants for Redis operations +const blpop_timeout_seconds = 5 + +pub fn (r Runner) queue_key() ! string { + return if r.namespace.len > 0 { + "${r.namespace}:runner:${r.name}" + } else { + "runner:${r.name}" + } +} + +// Spawn an actor with the trait-based interface +// This provides the common actor loop implementation that all actors can use +pub fn (mut r Runner) run() ! { + log.info('Starting ${r.name} actor') + + // Create channel for job processing + + job_chan := chan string{cap: 1} + spawn r.listen_for_jobs(job_chan) + + // Main actor loop + + for { + job_id := <- job_chan or { + log.info('Channel closed') + break + } + job := load_from_redis(mut r.redis_conn, job_id, r.namespace) or { + log.error('Actor "${r.name}" failed to load job ${job_id}: ${err}') + continue + } + r.run_job(job) or { + log.error('Actor "${r.name}" failed to process job ${job.id}: ${err}') + continue + } + } + + log.info('Actor "${r.name}" shutting down') +} + +pub fn (mut r Runner) listen_for_jobs(channel chan string) ! { + for { + queue_key := r.queue_key()! + result := r.redis_conn.blpop([queue_key], 5) or { + continue + } + // blpop returns [queue_key, job_id], we want the job_id (index 1) + if result.len > 1 { + channel <- result[1] + } + } +} + +// Implementation of Actor interface for MyCustomActor +pub fn (mut r Runner) run_job(job Job) ! { + log.info('MyCustomActor "${r.name}": Processing job ${job.id} with custom logic') + + command := "hero -s ${job.payload}" + + // Update job status to started + update_job_status(mut r.redis_conn, r.namespace, job.id, .started)! + if job.executor.starts_with('tmux') { + session_id := job.executor.all_after_first('tmux') + mut t := tmux.Tmux{ + sessionid: session_id.trim_string_left('.') + } + mut session := t.session_create( + name: session_id.trim_string_left('.'), + reset: true + ) or { return error('Failed to create session: ${err}') } + window_id := job.executor.all_after_first('tmux.${session_id}') + if window_id.len == 0 { + window := session.window_new( + cmd: command + reset: true + ) or { return error('Failed to create window: ${err}') } + } else { + pane_id := job.executor.all_after_first('tmux.${session_id}.${window_id}').trim_string_left('.') + if pane_id.len == 0 { + mut window := session.window_new( + name: window_id.trim_string_left('.') + reset: true + ) or { return error('Failed to create window: ${err}') } + window.pane_split( + cmd: command + ) or { return error('Failed to start window: ${err}') } + } else { + mut window := session.window_new( + name: window_id.trim_string_left('.') + cmd: command + reset: true + ) or { return error('Failed to create window: ${err}') } + } + } + } else { + return error('Unknown executor: ${job.executor}') + } + update_job_status(mut r.redis_conn, r.namespace, job.id, .finished)! + set_result(mut r.redis_conn, r.namespace, job.id, '')! + log.info('MyCustomActor "${r.name}": Job ${job.id} completed successfully') +} \ No newline at end of file diff --git a/src/runner_test.v b/src/runner_test.v new file mode 100644 index 0000000..c5e7518 --- /dev/null +++ b/src/runner_test.v @@ -0,0 +1,243 @@ +module runner + +import freeflowuniverse.herolib.core.redisclient +import freeflowuniverse.herolib.core.playbook {PlayBook} +import freeflowuniverse.herolib.baobab.engine { Engine, Context } + +__global ( + entries shared map[string]string +) + +// Mock actor implementation for testing +struct TestActor implements Actor { +pub: + name string = 'test_actor' +pub mut: + redis_conn redisclient.Redis +} + +// Implement the Actor interface +pub fn (mut actor TestActor) process_job(j job.Job) ! { + mut redis_conn := actor.redis_conn()! + // Update job status to started + job.update_status(mut redis_conn, j.id, .started) or { + return error('Failed to update job status to started: ${err}') + } + + // Run the job using the engine + result := actor.engine.run_in_context(j.script, + db_path: actor.db_path + caller_id: j.caller_id + context_id: j.context_id + ) or { + // Handle execution error + job.update_status(mut redis_conn, j.id, .error)! + job.set_error(mut redis_conn, j.id, '${err}')! + return err + } + + // Update job status to finished and set result + job.update_status(mut redis_conn, j.id, .finished) or { + return error('Failed to update job status to finished: ${err}') + } + job.set_result(mut redis_conn, j.id, result) or { + return error('Failed to set job result: ${err}') + } +} + +fn test_actor_interface_defaults() { + actor := TestActor{ + name: 'test_actor' + } + + // Test default values from interface + assert actor.name == 'test_actor' +} + +fn test_actor_queue_key() { + actor := TestActor{ + name: 'test_actor' + } + + assert actor.queue_key()! == 'runner:test_actor' +} + +// Mock player function for testing +fn mock_player(mut plbook PlayBook) ! { + // Simple test player that adds some content + action := plbook.get(filter:'entry.define')! + entries['entry'] = action.params.get!('entry')! +} + + +fn test_actor_run_job() { + mut e := Engine{ + players: [] + } + + // Register a simple test player + e.register_player(mock_player) or { panic('Failed to register player: ${err}') } + + actor := TestActor{ + id: 'test_runner' + db_path: '/tmp/test_run.db' + engine: e + } + + // Create a test job + test_job := job.new( + caller_id: 'test_caller', + context_id: 'test_context', + script: 'test script', + script_type: .v + ) + + // Run the job + result := actor.run_job(test_job) or { panic('Failed to run job: ${err}') } + + assert result.len > 0 +} + +fn test_actor_run_job_with_context() { + mut engine := Engine{ + players: [] + } + + // Register a player that uses context + engine.register_player(fn (mut plbook playbook.PlayBook) ! { + // This player might access context variables + plbook.add_result('Context-aware execution') + }) or { panic('Failed to register context player: ${err}') } + + actor := TestActor{ + id: 'context_actor' + db_path: '/tmp/context_test.db' + engine: engine + } + + // Create a job with specific context + test_job := job.Job{ + id: 'context_job_1' + caller_id: 'context_caller' + context_id: 'context_123' + script: 'context_script' + script_type: .osis + status: .dispatched + // ... other fields with defaults + } + + result := actor.run_job(test_job) or { panic('Failed to run context job: ${err}') } + + assert result.len > 0 +} + +fn test_actor_process_job_success() { + mut e := Engine{ + players: [] + } + + // Register a successful player + e.register_player(fn (mut plbook playbook.PlayBook) ! { + plbook.add_result('Success!') + }) or { panic('Failed to register success player: ${err}') } + + actor := TestActor{ + id: 'success_actor' + engine: e + } + + // Create test job + test_job := job.new_job('success_caller', 'success_context', 'success script', .v) + + // Process the job + actor.process_job(test_job) or { panic('Failed to process job: ${err}') } + + // Process the job + actor.process_job(test_job, mut mock_redis) or { panic('Failed to process job: ${err}') } + + // Verify Redis operations were called + assert mock_redis.operations.len > 0 + + // Check that status was updated to started and then finished + job_key := 'hero:job:${test_job.id}' + assert mock_redis.job_status[job_key] == 'finished' + assert mock_redis.job_results[job_key].len > 0 +} + +fn test_actor_process_job_error() { + mut engine := Engine{ + players: [] + } + + // Register a failing player + engine.register_player(fn (mut plbook playbook.PlayBook) ! { + return error('Test error') + }) or { panic('Failed to register failing player: ${err}') } + + actor := TestActor{ + id: 'error_actor' + engine: engine + } + + // Create test job + test_job := job.new_job('error_caller', 'error_context', 'error script', .v) + + // Mock Redis connection + mut mock_redis := MockRedisConn{ + operations: [] + job_status: map[string]string{} + job_results: map[string]string{} + job_errors: map[string]string{} + } + + // Process the job (should handle error gracefully) + if result := actor.process_job(test_job, mut mock_redis) { + panic('Expected job processing to fail') + } + + // Verify error was recorded + job_key := 'hero:job:${test_job.id}' + assert mock_redis.job_status[job_key] == 'error' + assert mock_redis.job_errors[job_key].len > 0 +} + +fn test_multiple_actors() { + mut engine1 := Engine{ players: [] } + mut engine2 := Engine{ players: [] } + + engine1.register_player(fn (mut plbook playbook.PlayBook) ! { + plbook.add_result('Actor 1 result') + }) or { panic('Failed to register player 1: ${err}') } + + engine2.register_player(fn (mut plbook playbook.PlayBook) ! { + plbook.add_result('Actor 2 result') + }) or { panic('Failed to register player 2: ${err}') } + + actor1 := TestActor{ + id: 'actor_1' + engine: engine1 + } + + actor2 := TestActor{ + id: 'actor_2' + engine: engine2 + } + + // Test that actors have different queue keys + queue1 := actor1.queue_key() or { panic('Failed to get queue key 1: ${err}') } + queue2 := actor2.queue_key() or { panic('Failed to get queue key 2: ${err}') } + + assert queue1 != queue2 + assert queue1.contains('actor_1') + assert queue2.contains('actor_2') + + // Test that actors can run jobs independently + job1 := job.new_job('caller1', 'context1', 'script1', .v) + job2 := job.new_job('caller2', 'context2', 'script2', .osis) + + result1 := actor1.run_job(job1) or { panic('Failed to run job 1: ${err}') } + result2 := actor2.run_job(job2) or { panic('Failed to run job 2: ${err}') } + + assert result1.len > 0 + assert result2.len > 0 +} diff --git a/v.mod b/v.mod new file mode 100644 index 0000000..59c6e64 --- /dev/null +++ b/v.mod @@ -0,0 +1,7 @@ +Module { + name: 'runner' + description: 'hero runner in v' + version: '0.0.0' + license: 'MIT' + dependencies: [] +}