initial commit

This commit is contained in:
Timur Gordon
2025-08-26 14:47:52 +02:00
commit 010ecc7c71
18 changed files with 1543 additions and 0 deletions

8
.editorconfig Normal file
View File

@@ -0,0 +1,8 @@
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
[*.v]
indent_style = tab

8
.gitattributes vendored Normal file
View File

@@ -0,0 +1,8 @@
* text=auto eol=lf
*.bat eol=crlf
*.v linguist-language=V
*.vv linguist-language=V
*.vsh linguist-language=V
v.mod linguist-language=V
.vdocignore linguist-language=ignore

26
.gitignore vendored Normal file
View File

@@ -0,0 +1,26 @@
# Binaries for programs and plugins
main
runner_v
*.exe
*.exe~
*.so
*.dylib
*.dll
# Ignore binary output folders
bin/
# Ignore common editor/system specific metadata
.DS_Store
.idea/
.vscode/
*.iml
# ENV
.env
# vweb and database
*.db
*.js
cmd/runner

280
README.md Normal file
View File

@@ -0,0 +1,280 @@
# Hero Runner V
A V language implementation of the Hero Baobab runner system for distributed job execution. This runner provides a Redis-based job queue system with support for various executors including tmux sessions.
## Overview
The Hero Runner V is a lightweight, high-performance job execution system built in V lang. It connects to Redis for job queuing and supports multiple execution environments through configurable executors.
## Features
- **Redis-based Job Queue**: Reliable job queuing and status tracking
- **Multiple Executors**: Support for tmux sessions, windows, and panes
- **Job Lifecycle Management**: Complete job status tracking (dispatched → started → finished/error)
- **Configurable Timeouts**: Per-job timeout configuration
- **Environment Variables**: Job-specific environment variable support
- **Namespace Support**: Multi-tenant runner organization
- **CLI Interface**: Command-line interface with flag parsing
## Architecture
```text
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Redis Queue │ │ Runner │ │ Executor │
│ │ │ │ │ │
│ Job Storage │◄──►│ Job Processor │◄──►│ tmux.session │
│ Status Updates │ │ Status Manager │ │ tmux.window │
│ │ │ │ │ tmux.pane │
└─────────────────┘ └─────────────────┘ └─────────────────┘
```
## Installation
### Prerequisites
- V language compiler
- Redis server
- Herolib dependencies
### Quick Install
```bash
# Run the installation script
./scripts/install.sh
```
This will:
1. Install V language and Herolib
2. Set up module dependencies
3. Link the runner module to vmodules
### Manual Installation
```bash
# Install V & Herolib
curl 'https://raw.githubusercontent.com/freeflowuniverse/herolib/refs/heads/development/install_v.sh' > /tmp/install_v.sh
bash /tmp/install_v.sh --analyzer --herolib
# Install herolib
cd ${HOME}/code/github/freeflowuniverse/herolib
bash install_herolib.vsh
# Link runner module
mkdir -p "${HOME}/.vmodules/herocode"
ln -s "/path/to/runner_v/src" "${HOME}/.vmodules/herocode/runner"
```
## Usage
### Starting the Runner
```bash
# Start with default settings
./scripts/run.sh
# Or run directly with custom options
./cmd/runner.vsh --name my_runner --namespace production --redis_url redis://localhost:6379
```
### Command Line Options
```bash
Usage: runner [flags]
Flags:
-n --name name of the runner (default: test_runner)
-s --namespace namespace of the runner (default: '')
-r --redis_url redis url (default: 127.0.0.1:6379)
-h --help Show help message
```
### Creating and Dispatching Jobs
```v
import herocode.runner
import freeflowuniverse.herolib.core.redisclient
import time
// Create a job
job := runner.new_job(runner.Job{
caller_id: 'my_app'
context_id: 'task_123'
runner: 'my_runner'
executor: 'tmux.session1.window1'
payload: 'echo "Hello World" && sleep 5'
timeout: 30
})!
// Connect to Redis and store the job
mut redis_conn := redisclient.new('127.0.0.1:6379')!
job.store_in_redis(mut redis_conn, 'production:')!
// Add job to runner queue
runner_queue_key := 'production:runner:my_runner'
mut queue := redis_conn.queue_get(runner_queue_key)
queue.add(job.id)!
```
## Job Structure
Jobs contain the following fields:
```v
pub struct Job {
pub mut:
id string // Unique job identifier
caller_id string // ID of the calling service
context_id string // Context/session identifier
runner string // Target runner name
executor string // Execution environment (e.g., tmux.session1)
payload string // Script/command to execute
status JobStatus // Current job status
result string // Job execution result
error string // Error message if failed
timeout int // Timeout in seconds
env_vars map[string]string // Environment variables
created_at time.Time // Job creation timestamp
updated_at time.Time // Last update timestamp
}
```
### Job Status Lifecycle
```text
dispatched → started → finished
↓ ↓ ↑
└─────── error ──────┘
```
- **dispatched**: Job is queued and waiting for processing
- **started**: Job execution has begun
- **finished**: Job completed successfully
- **error**: Job failed during execution
## Executor Types
The runner supports various executor types for different execution environments:
### Tmux Executors
- `tmux.session_name` - Execute in a tmux session
- `tmux.session_name.window_name` - Execute in a specific window
- `tmux.session_name.window_name.pane_id` - Execute in a specific pane
Example:
```v
job := runner.Job{
// ... other fields
executor: 'tmux.dev_session.main_window.pane1'
payload: 'npm run build && npm test'
}
```
## Examples
### Basic Runner
```v
#!/usr/bin/env -S v run
import herocode.runner
import freeflowuniverse.herolib.core.redisclient
mut r := &runner.Runner{
name: 'basic_runner'
namespace: 'development'
redis_conn: redisclient.new('127.0.0.1:6379')!
}
// Start the runner
spawn r.run()
// Keep running
for {}
```
### Job Submission Script
See `examples/test_runner_rpc.vsh` for a complete example of submitting multiple jobs to a runner.
## Testing
Run the test suite:
```bash
# Run all tests
v test src/
# Run specific test files
v test src/runner_test.v
v test src/job_test.v
```
## Configuration
### Environment Variables
- `REDIS_URL`: Redis connection URL (default: 127.0.0.1:6379)
- `RUNNER_NAME`: Default runner name
- `RUNNER_NAMESPACE`: Default namespace
### Redis Keys
The runner uses the following Redis key patterns:
- Jobs: `{namespace}job_{id}`
- Runner Queues: `{namespace}runner:{runner_name}`
- Status Updates: Stored within job objects
## Development
### Project Structure
```
runner_v/
├── cmd/ # Command-line applications
│ └── runner.vsh # Main runner executable
├── src/ # Source code
│ ├── runner.v # Core runner implementation
│ ├── job.v # Job structure and operations
│ ├── factory.v # Job creation utilities
│ └── *_test.v # Test files
├── examples/ # Usage examples
├── scripts/ # Build and deployment scripts
└── README.md # This file
```
### Building
```bash
# Build the runner
v -prod cmd/runner.vsh -o runner
# Run in development mode
v run cmd/runner.vsh
```
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Run the test suite
6. Submit a pull request
## License
MIT License - see LICENSE file for details.
## Related Projects
- [Herolib](https://github.com/freeflowuniverse/herolib) - Core utilities and libraries
- [Hero Baobab](https://github.com/freeflowuniverse/baobab) - Distributed actor system
## Support
For issues and questions:
- Open an issue on GitHub
- Check the examples directory for usage patterns
- Review the test files for implementation details

47
cmd/runner.vsh Executable file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run
import os
import time
import flag
import herocode.runner
import freeflowuniverse.herolib.core.redisclient
mut fp := flag.new_flag_parser(os.args)
fp.application('runner')
fp.version('v0.1.0')
fp.description('Runner in Vlang')
fp.skip_executable()
// Define the flags
runner_name := fp.string('name', `n`, 'test_runner', 'name of the runner')
namespace := fp.string('namespace', `s`, '', 'namespace of the runner')
redis_url := fp.string('redis_url', `r`, '127.0.0.1:6379', 'redis url')
help_requested := fp.bool('help', `h`, false, 'Show help message')
// Parse the arguments
remaining_args := fp.finalize() or {
eprintln('Error parsing arguments: ${err}')
println(fp.usage())
exit(1)
}
if help_requested {
println(fp.usage())
exit(0)
}
mut r := &runner.Runner{
name: runner_name
namespace: namespace
redis_conn: redisclient.new(
if redis_url.len > 0 {
redis_url
} else {
'127.0.0.1:6379'
}
)!
}
spawn r.run()
for {}

BIN
examples/runner Executable file

Binary file not shown.

34
examples/runner.vsh Executable file
View File

@@ -0,0 +1,34 @@
#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run
import time
import freeflowuniverse.herolib.baobab.runner
import freeflowuniverse.herolib.core.redisclient
const namespace = ''
mut r := &runner.Runner{
name: 'test_runner'
redis_conn: redisclient.new('127.0.0.1:6379')!
}
spawn r.run()
// job := runner.Job{
// id: 'test_job_1'
// caller_id: 'test_caller'
// context_id: 'test_context_1'
// runner: 'test_runner'
// executor: 'tmux.session1'
// payload: 'sleep 10\necho "Hello from job 1"\nsleep 10'
// status: .dispatched
// timeout: 30
// created_at: time.now()
// updated_at: time.now()
// }
// mut redis_conn := redisclient.new('127.0.0.1:6379')!
// job.store_in_redis(mut redis_conn, namespace)!
// mut runner_q := redis_conn.queue_get(r.queue_key()!)
// runner_q.add(job.id)!
for {}

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run
import freeflowuniverse.herolib.core.redisclient
import freeflowuniverse.herolib.baobab.runner
import time
import log
// Test script to send RPC calls over Redis to test the runner
fn main() {
log.info('Starting runner RPC test script')
// Connect to Redis
mut redis := redisclient.new('127.0.0.1:6379')!
// Test runner configuration
runner_name := 'test_runner'
namespace := ''
// Create test jobs
test_jobs := [
runner.Job{
id: 'test_job_1'
caller_id: 'test_caller'
context_id: 'test_context_1'
runner: runner_name
executor: 'tmux.session1'
payload: 'echo "Hello from job 1"'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
},
runner.Job{
id: 'test_job_2'
caller_id: 'test_caller'
context_id: 'test_context_2'
runner: runner_name
executor: 'tmux.session2.window1'
payload: 'ls -la && echo "Job 2 completed"'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
},
runner.Job{
id: 'test_job_3'
caller_id: 'test_caller'
context_id: 'test_context_3'
runner: runner_name
executor: 'tmux.session3.window1.pane1'
payload: 'date && echo "Current time from job 3"'
status: .dispatched
timeout: 30
created_at: time.now()
updated_at: time.now()
}
]
log.info('Storing ${test_jobs.len} test jobs in Redis and dispatching to runner queue')
// Store jobs in Redis and dispatch them to the runner queue
for job in test_jobs {
// Store job data in Redis
job.store_in_redis(mut redis, namespace) or {
log.error('Failed to store job ${job.id}: ${err}')
continue
}
// Dispatch job to runner queue by pushing job ID to the queue
queue_key := if namespace.len > 0 {
"${namespace}:runner:${runner_name}"
} else {
"runner:${runner_name}"
}
redis.rpush(queue_key, job.id) or {
log.error('Failed to dispatch job ${job.id} to queue ${queue_key}: ${err}')
continue
}
log.info('Dispatched job ${job.id} to queue ${queue_key}')
// Small delay between jobs
time.sleep(1 * time.second)
}
log.info('All test jobs dispatched. Monitoring job status...')
// Monitor job status for a while
for i in 0..30 { // Monitor for 30 seconds
log.info('--- Status check ${i + 1} ---')
for job in test_jobs {
loaded_job := runner.load_from_redis(mut redis, job.id, namespace) or {
log.error('Failed to load job ${job.id}: ${err}')
continue
}
log.info('Job ${loaded_job.id}: status=${loaded_job.status}, result="${loaded_job.result}", error="${loaded_job.error}"')
}
time.sleep(1 * time.second)
}
log.info('Test completed. Check tmux sessions to see if commands were executed.')
log.info('You can run "tmux list-sessions" to see created sessions.')
}

18
scripts/install.sh Executable file
View File

@@ -0,0 +1,18 @@
#!/bin/bash
# Install V & Herolib
curl 'https://raw.githubusercontent.com/freeflowuniverse/herolib/refs/heads/development/install_v.sh' > /tmp/install_v.sh
bash /tmp/install_v.sh --herolib
pushd ${HOME}/code/github/freeflowuniverse/herolib
./install_herolib.vsh
popd
# Install runner
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
MODULE_DIR="${SCRIPT_DIR}/../"
# Link module to vmodules
mkdir -p "${HOME}/.vmodules/herocode"
unlink ${HOME}/.vmodules/herocode/runner
ln -s ${MODULE_DIR}/src ${HOME}/.vmodules/herocode/runner

6
scripts/run.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/bin/bash
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
MODULE_DIR="${SCRIPT_DIR}/../"
${MODULE_DIR}/cmd/runner.vsh

10
scripts/test.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
# serve.sh - Build optimized WASM and serve with Caddy + Brotli compression
set -e
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
SOURCE_DIR="${SCRIPT_DIR}/../src"
pushd ${SOURCE_DIR}
vtest
popd

269
src/README.md Normal file
View File

@@ -0,0 +1,269 @@
# V Lang Actor Interface
This module provides a V lang port of the Rust actor trait interface for the Hero Baobab system. It enables implementing actors in V lang with a simple interface: implement `process_job` and use `spawn` to run the actor.
## Architecture
The V lang actor interface mirrors the Rust implementation with these key components:
```text
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SyncActor │ │ AsyncActor │ │ MyCustomActor │
│ │ │ │ │ │
│ process_job() │ │ process_job() │ │ process_job() │
│ (sequential) │ │ (concurrent) │ │ (custom logic) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└───────────────┬───────────────────────────────┘
┌───────▼───────┐
│ Actor Interface│
│ │
│ spawn_actor() │
│ process_job() │
│ config │
└───────────────┘
```
## Core Components
### Actor Interface
All actors must implement the `Actor` interface:
```v
pub interface Actor {
// Process a single job - must be implemented by concrete actors
process_job(job Job, mut redis_conn redisclient.Redis) !
// Get the actor type identifier
actor_type() string
// Get the actor ID
actor_id() string
// Get the Redis URL
redis_url() string
// Get the database path
db_path() string
// Check if tasks should be preserved
preserve_tasks() bool
}
```
### ActorConfig
Configuration structure for all actors:
```v
pub struct ActorConfig {
pub:
actor_id string
db_path string
redis_url string
preserve_tasks bool
default_timeout ?time.Duration // Optional timeout for async actors
}
```
### Job Structure
Jobs processed by actors:
```v
pub struct Job {
pub mut:
id string
caller_id string
context_id string
script string
status JobStatus
result string
error string
created_at time.Time
started_at time.Time
finished_at time.Time
}
```
## Built-in Actor Implementations
### SyncActor
Processes jobs sequentially, one at a time:
```v
import freeflowuniverse.herolib.lib.baobab.actor
// Create configuration
config := actor.new_actor_config(
'sync_actor_1',
'/path/to/database',
'redis://localhost:6379',
false
)
// Create and spawn sync actor
sync_actor := actor.new_sync_actor(config)!
mut shutdown_chan := chan bool{cap: 1}
go actor.spawn_actor(sync_actor, mut shutdown_chan)
// Later, shutdown the actor
shutdown_chan <- true
```
### AsyncActor
Processes jobs concurrently with timeout support:
```v
import time
import freeflowuniverse.herolib.lib.baobab.actor
// Create configuration with timeout
mut config := actor.new_actor_config(
'async_actor_1',
'/path/to/database',
'redis://localhost:6379',
false
)
config = config.with_default_timeout(time.Duration(300 * time.second))
// Create and spawn async actor
async_actor := actor.new_async_actor(config)!
mut shutdown_chan := chan bool{cap: 1}
go actor.spawn_actor(async_actor, mut shutdown_chan)
// Later, shutdown the actor
shutdown_chan <- true
```
## Creating Custom Actors
To implement a custom actor, simply implement the `Actor` interface:
```v
import freeflowuniverse.herolib.lib.baobab.actor
struct MyCustomActor {
pub:
config actor.ActorConfig
mut:
engine rhai.Engine
// Add your custom fields here
}
pub fn new_custom_actor(config actor.ActorConfig) !MyCustomActor {
mut engine := rhai.new_engine()!
return MyCustomActor{
config: config
engine: engine
}
}
// Implement the Actor interface
pub fn (mut actor MyCustomActor) process_job(job actor.Job, mut redis_conn redisclient.Redis) ! {
// Your custom job processing logic here
// Update job status to started
actor.update_job_status(mut redis_conn, job.id, .started)!
// Execute the script (or your custom logic)
result := actor.execute_job_with_engine(mut actor.engine, job, actor.config.db_path)!
// Update job status to finished and set result
actor.update_job_status(mut redis_conn, job.id, .finished)!
actor.set_job_result(mut redis_conn, job.id, result)!
// Clean up if needed
actor.cleanup_job(mut redis_conn, job.id, job.context_id, actor.config.preserve_tasks)!
}
// Implement required interface methods
pub fn (actor MyCustomActor) actor_type() string { return 'MyCustomActor' }
pub fn (actor MyCustomActor) actor_id() string { return actor.config.actor_id }
pub fn (actor MyCustomActor) redis_url() string { return actor.config.redis_url }
pub fn (actor MyCustomActor) db_path() string { return actor.config.db_path }
pub fn (actor MyCustomActor) preserve_tasks() bool { return actor.config.preserve_tasks }
// Usage
config := actor.new_actor_config('my_actor', '/db/path', 'redis://localhost:6379', false)
custom_actor := new_custom_actor(config)!
mut shutdown_chan := chan bool{cap: 1}
go actor.spawn_actor(custom_actor, mut shutdown_chan)
```
## Key Features
### Unified Interface
- Same `spawn_actor()` function works with all actor implementations
- Consistent configuration and lifecycle management
- Clean separation between actor logic and infrastructure
### Redis Integration
- Automatic Redis connection management
- Job polling from actor-specific queues (`hero:job:actor_queue:{actor_id}`)
- Job status tracking and result storage
- Optional job cleanup based on `preserve_tasks` setting
### Script Execution
- Rhai script engine integration
- Automatic job context setup (DB_PATH, CALLER_ID, CONTEXT_ID)
- Error handling and status updates
### Graceful Shutdown
- Channel-based shutdown signaling
- Proper cleanup of resources
- Support for cancelling running jobs (AsyncActor)
## Job Processing Flow
1. **Job Polling**: Actor polls Redis queue using BLPOP
2. **Job Loading**: Load job details from Redis hash
3. **Status Update**: Mark job as "started"
4. **Script Execution**: Execute Rhai script with job context
5. **Result Storage**: Store result or error in Redis
6. **Status Update**: Mark job as "finished" or "error"
7. **Cleanup**: Optionally remove job from Redis
## Error Handling
All operations include comprehensive error handling:
- Redis connection failures
- Job loading errors
- Script execution errors
- Timeout handling (AsyncActor)
- Graceful degradation and logging
## Migration from Rust
This V lang implementation provides the same functionality as the Rust version:
- ✅ Actor trait abstraction
- ✅ Sync and async actor implementations
- ✅ Unified spawn interface
- ✅ Redis job queue integration
- ✅ Rhai script execution
- ✅ Job lifecycle management
- ✅ Error handling and logging
- ✅ Graceful shutdown
## Dependencies
The actor interface depends on these herolib modules:
- `freeflowuniverse.herolib.clients.redisclient` - Redis operations
- `freeflowuniverse.herolib.data.rhai` - Script execution
- `freeflowuniverse.herolib.core.base` - Base utilities
## Usage Summary
To implement an actor in V lang:
1. **Create ActorConfig**: Configure actor ID, database path, Redis URL, etc.
2. **Implement Actor Interface**: Create a struct that implements the `Actor` interface
3. **Implement process_job()**: Add your job processing logic
4. **Use spawn()**: Call `spawn_actor()` to start the actor loop
That's it! The interface handles all the Redis polling, job management, and infrastructure concerns automatically.

27
src/factory.v Normal file
View File

@@ -0,0 +1,27 @@
module runner
import freeflowuniverse.herolib.core.redisclient
import time
// Build the job from the current builder configuration
pub fn new_job(job Job) !Job {
// Validate required fields
if job.caller_id.len == 0 {
return error('caller_id is required')
}
if job.context_id.len == 0 {
return error('context_id is required')
}
if job.payload.len == 0 {
return error('payload is required')
}
return Job{
...job
id: if job.id.len == 0 { generate_job_id() } else { job.id }
created_at: time.now()
timeout: if job.timeout == 0 { 300 } else { job.timeout }
}
}

134
src/job.v Normal file
View File

@@ -0,0 +1,134 @@
module runner
import rand
import time
import json
import freeflowuniverse.herolib.core.redisclient
// Job status enumeration
pub enum JobStatus {
dispatched
started
finished
error
}
// Representation of a script execution request
// This structure contains all the information needed to execute a script
// on a actor service, including the script content, dependencies, and metadata
pub struct Job {
pub mut:
id string
caller_id string
context_id string
runner string
executor string
payload string
status JobStatus
result string
error string
timeout int
env_vars map[string]string
created_at time.Time
updated_at time.Time
}
// Generate a unique job ID
fn generate_job_id() string {
// Simple UUID-like ID generation
now := time.now()
timestamp := now.unix()
random_part := rand.int_in_range(1000, 9999) or { 1234 }
return 'job_${timestamp}_${random_part}'
}
// Store this job in Redis
pub fn (job Job) store_in_redis(mut conn redisclient.Redis, namespace string) ! {
job_key := '${namespace}${job.id}'
// Store job as JSON
job_json := json.encode(job)
conn.set(job_key, job_json) or { return error('Failed to store job ${job.id}: ${err}') }
}
// Load a job from Redis by ID
pub fn load_from_redis(mut conn redisclient.Redis, job_id string, namespace string) !Job {
job_key := '${namespace}${job_id}'
// Get job JSON from Redis
job_json := conn.get(job_key) or {
return error('Failed to load job ${job_id} from Redis: ${err}')
}
if job_json.len == 0 {
return error('Job ${job_id} not found in Redis')
}
// Parse job from JSON
job := json.decode(Job, job_json) or {
return error('Failed to decode job ${job_id} JSON: ${err}')
}
return job
}
// Update job status in Redis
pub fn update_job_status(mut conn redisclient.Redis, namespace string, job_id string, status JobStatus) ! {
// Load job, update status, and store back
mut job := load_from_redis(mut conn, job_id, namespace)!
job.status = status
job.updated_at = time.now()
job.store_in_redis(mut conn, namespace)!
}
// Get job status from Redis
pub fn get_status(mut conn redisclient.Redis, job_id string, namespace string) !JobStatus {
job := load_from_redis(mut conn, job_id, namespace)!
return job.status
}
// Set job result in Redis
pub fn set_result(mut conn redisclient.Redis, namespace string, job_id string, result string) ! {
// Load job, update result, and store back
mut job := load_from_redis(mut conn, job_id, namespace)!
job.result = result
job.updated_at = time.now()
job.store_in_redis(mut conn, namespace)!
}
// Set job error in Redis
pub fn set_error(mut conn redisclient.Redis, namespace string, job_id string, error_msg string) ! {
// Load job, update error, and store back
mut job := load_from_redis(mut conn, job_id, namespace)!
job.error = error_msg
job.status = .error
job.updated_at = time.now()
job.store_in_redis(mut conn, namespace)!
}
// Delete job from Redis
pub fn delete_from_redis(mut conn redisclient.Redis, job_id string, namespace string) ! {
job_key := '${namespace}${job_id}'
conn.del(job_key) or {
return error('Failed to delete job ${job_id} from Redis: ${err}')
}
}
// List all job IDs from Redis
pub fn list_all_job_ids(mut conn redisclient.Redis, namespace string) ![]string {
pattern := '${namespace}*'
keys := conn.keys(pattern) or {
return error('Failed to list job keys: ${err}')
}
mut job_ids := []string{}
for key in keys {
// Extract job ID from key (remove namespace prefix)
if key.starts_with(namespace) {
job_id := key[namespace.len..]
job_ids << job_id
}
}
return job_ids
}

194
src/job_test.v Normal file
View File

@@ -0,0 +1,194 @@
module runner
import time
fn test_script_type_conversion() {
// Test ScriptType to string conversion
assert ScriptType.osis.str() == 'osis'
assert ScriptType.sal.str() == 'sal'
assert ScriptType.v.str() == 'v'
assert ScriptType.python.str() == 'python'
}
fn test_job_status_conversion() {
assert JobStatus.dispatched.str() == 'dispatched'
assert JobStatus.started.str() == 'started'
assert JobStatus.finished.str() == 'finished'
assert JobStatus.error.str() == 'error'
}
fn test_script_type_from_string() {
assert 'osis' == ScriptType.osis.str()
assert 'sal' == ScriptType.sal.str()
assert 'v' == ScriptType.v.str()
assert 'python' == ScriptType.python.str()
// Test invalid string
if result := ScriptType.from_string('invalid') {
panic('Should fail for invalid script type')
}
}
fn test_job_status_from_string() {
assert 'dispatched' == JobStatus.dispatched.str()
assert 'started' == JobStatus.started.str()
assert 'finished' == JobStatus.finished.str()
assert 'error' == JobStatus.error.str()
// Test invalid string
if result := JobStatus.from_string('invalid') {
panic('Should fail for invalid job status')
}
}
fn test_script_type_actor_queue_suffix() {
assert ScriptType.osis.actor_queue_suffix() == 'osis'
assert ScriptType.sal.actor_queue_suffix() == 'sal'
assert ScriptType.v.actor_queue_suffix() == 'v'
assert ScriptType.python.actor_queue_suffix() == 'python'
}
fn test_new_job_creation()! {
// Test basic job creation using the old function
job := new_job(
caller_id: 'test_caller'
context_id: 'test_context'
script: 'print("hello")'
script_type: ScriptType.osis
)!
assert job.caller_id == 'test_caller'
assert job.context_id == 'test_context'
assert job.script == 'print("hello")'
assert job.script_type == ScriptType.osis
assert job.status == JobStatus.dispatched
assert job.timeout == 300 // default timeout
assert job.retries == 0
assert job.concurrent == false
assert job.id.len > 0 // should have generated an ID
}
fn test_job_factory() {
// Test job creation using factory with custom fields
mut job_template := Job{
caller_id: 'test_caller'
context_id: 'test_context'
script: 'print("test")'
script_type: .v
timeout: 60
retries: 3
concurrent: true
env_vars: {'TEST_VAR': 'test_value'}
prerequisites: ['job_1']
dependents: ['job_2']
}
job := new_job(job_template) or { panic('Failed to create job: ${err}') }
assert job.caller_id == 'test_caller'
assert job.context_id == 'test_context'
assert job.script == 'print("test")'
assert job.script_type == ScriptType.v
assert job.timeout == 60
assert job.retries == 3
assert job.concurrent == true
assert job.env_vars['TEST_VAR'] == 'test_value'
assert job.prerequisites.len == 1
assert job.prerequisites[0] == 'job_1'
assert job.dependents.len == 1
assert job.dependents[0] == 'job_2'
assert job.id.len > 0 // should have generated an ID
assert job.created_at != time.Time{} // should have set creation time
}
fn test_job_factory_validation() {
// Test missing caller_id
mut invalid_job := Job{
context_id: 'test_context'
script: 'test_script'
script_type: .v
}
if result := new_job(invalid_job) {
panic('Should fail without caller_id')
}
// Test missing context_id
invalid_job = Job{
caller_id: 'test_caller'
script: 'test_script'
script_type: .v
}
if result := new_job(invalid_job) {
panic('Should fail without context_id')
}
// Test missing script
invalid_job = Job{
caller_id: 'test_caller'
context_id: 'test_context'
script_type: .v
}
if result := new_job(invalid_job) {
panic('Should fail without script')
}
}
fn test_job_factory_with_env_vars() {
mut job_template := Job{
caller_id: 'test_caller'
context_id: 'test_context'
script: 'test_script'
script_type: .v
env_vars: {
'KEY1': 'value1'
'KEY2': 'value2'
'KEY3': 'value3'
}
}
job := new_job(job_template) or { panic('Failed to create job with env vars: ${err}') }
assert job.env_vars.len == 3
assert job.env_vars['KEY1'] == 'value1'
assert job.env_vars['KEY2'] == 'value2'
assert job.env_vars['KEY3'] == 'value3'
}
fn test_job_factory_with_dependencies() {
mut job_template := Job{
caller_id: 'test_caller'
context_id: 'test_context'
script: 'test_script'
script_type: .v
prerequisites: ['job1', 'job2']
dependents: ['job3', 'job4']
}
job := new_job(job_template) or { panic('Failed to create job with dependencies: ${err}') }
assert job.prerequisites.len == 2
assert job.prerequisites.contains('job1')
assert job.prerequisites.contains('job2')
assert job.dependents.len == 2
assert job.dependents.contains('job3')
assert job.dependents.contains('job4')
}
fn test_job_factory_with_custom_id() {
mut job_template := Job{
id: 'custom_job_id'
caller_id: 'test_caller'
context_id: 'test_context'
script: 'test_script'
script_type: .v
}
job := new_job(job_template) or { panic('Failed to create job with custom ID: ${err}') }
assert job.id == 'custom_job_id'
assert job.created_at != time.Time{} // should still set creation time
}

125
src/runner.v Normal file
View File

@@ -0,0 +1,125 @@
module runner
import log
import freeflowuniverse.herolib.core.redisclient
import freeflowuniverse.herolib.osal.tmux
// ActorConfig holds configuration for actor instances
pub struct RunnerConfig {
pub:
name string
namespace string
redis_url string
}
// Runner interface defines the common behavior for all actor implementations
pub struct Runner {
pub:
name string
namespace string
pub mut:
redis_conn &redisclient.Redis
}
// Constants for Redis operations
const blpop_timeout_seconds = 5
pub fn (r Runner) queue_key() ! string {
return if r.namespace.len > 0 {
"${r.namespace}:runner:${r.name}"
} else {
"runner:${r.name}"
}
}
// Spawn an actor with the trait-based interface
// This provides the common actor loop implementation that all actors can use
pub fn (mut r Runner) run() ! {
log.info('Starting ${r.name} actor')
// Create channel for job processing
job_chan := chan string{cap: 1}
spawn r.listen_for_jobs(job_chan)
// Main actor loop
for {
job_id := <- job_chan or {
log.info('Channel closed')
break
}
job := load_from_redis(mut r.redis_conn, job_id, r.namespace) or {
log.error('Actor "${r.name}" failed to load job ${job_id}: ${err}')
continue
}
r.run_job(job) or {
log.error('Actor "${r.name}" failed to process job ${job.id}: ${err}')
continue
}
}
log.info('Actor "${r.name}" shutting down')
}
pub fn (mut r Runner) listen_for_jobs(channel chan string) ! {
for {
queue_key := r.queue_key()!
result := r.redis_conn.blpop([queue_key], 5) or {
continue
}
// blpop returns [queue_key, job_id], we want the job_id (index 1)
if result.len > 1 {
channel <- result[1]
}
}
}
// Implementation of Actor interface for MyCustomActor
pub fn (mut r Runner) run_job(job Job) ! {
log.info('MyCustomActor "${r.name}": Processing job ${job.id} with custom logic')
command := "hero -s ${job.payload}"
// Update job status to started
update_job_status(mut r.redis_conn, r.namespace, job.id, .started)!
if job.executor.starts_with('tmux') {
session_id := job.executor.all_after_first('tmux')
mut t := tmux.Tmux{
sessionid: session_id.trim_string_left('.')
}
mut session := t.session_create(
name: session_id.trim_string_left('.'),
reset: true
) or { return error('Failed to create session: ${err}') }
window_id := job.executor.all_after_first('tmux.${session_id}')
if window_id.len == 0 {
window := session.window_new(
cmd: command
reset: true
) or { return error('Failed to create window: ${err}') }
} else {
pane_id := job.executor.all_after_first('tmux.${session_id}.${window_id}').trim_string_left('.')
if pane_id.len == 0 {
mut window := session.window_new(
name: window_id.trim_string_left('.')
reset: true
) or { return error('Failed to create window: ${err}') }
window.pane_split(
cmd: command
) or { return error('Failed to start window: ${err}') }
} else {
mut window := session.window_new(
name: window_id.trim_string_left('.')
cmd: command
reset: true
) or { return error('Failed to create window: ${err}') }
}
}
} else {
return error('Unknown executor: ${job.executor}')
}
update_job_status(mut r.redis_conn, r.namespace, job.id, .finished)!
set_result(mut r.redis_conn, r.namespace, job.id, '')!
log.info('MyCustomActor "${r.name}": Job ${job.id} completed successfully')
}

243
src/runner_test.v Normal file
View File

@@ -0,0 +1,243 @@
module runner
import freeflowuniverse.herolib.core.redisclient
import freeflowuniverse.herolib.core.playbook {PlayBook}
import freeflowuniverse.herolib.baobab.engine { Engine, Context }
__global (
entries shared map[string]string
)
// Mock actor implementation for testing
struct TestActor implements Actor {
pub:
name string = 'test_actor'
pub mut:
redis_conn redisclient.Redis
}
// Implement the Actor interface
pub fn (mut actor TestActor) process_job(j job.Job) ! {
mut redis_conn := actor.redis_conn()!
// Update job status to started
job.update_status(mut redis_conn, j.id, .started) or {
return error('Failed to update job status to started: ${err}')
}
// Run the job using the engine
result := actor.engine.run_in_context(j.script,
db_path: actor.db_path
caller_id: j.caller_id
context_id: j.context_id
) or {
// Handle execution error
job.update_status(mut redis_conn, j.id, .error)!
job.set_error(mut redis_conn, j.id, '${err}')!
return err
}
// Update job status to finished and set result
job.update_status(mut redis_conn, j.id, .finished) or {
return error('Failed to update job status to finished: ${err}')
}
job.set_result(mut redis_conn, j.id, result) or {
return error('Failed to set job result: ${err}')
}
}
fn test_actor_interface_defaults() {
actor := TestActor{
name: 'test_actor'
}
// Test default values from interface
assert actor.name == 'test_actor'
}
fn test_actor_queue_key() {
actor := TestActor{
name: 'test_actor'
}
assert actor.queue_key()! == 'runner:test_actor'
}
// Mock player function for testing
fn mock_player(mut plbook PlayBook) ! {
// Simple test player that adds some content
action := plbook.get(filter:'entry.define')!
entries['entry'] = action.params.get!('entry')!
}
fn test_actor_run_job() {
mut e := Engine{
players: []
}
// Register a simple test player
e.register_player(mock_player) or { panic('Failed to register player: ${err}') }
actor := TestActor{
id: 'test_runner'
db_path: '/tmp/test_run.db'
engine: e
}
// Create a test job
test_job := job.new(
caller_id: 'test_caller',
context_id: 'test_context',
script: 'test script',
script_type: .v
)
// Run the job
result := actor.run_job(test_job) or { panic('Failed to run job: ${err}') }
assert result.len > 0
}
fn test_actor_run_job_with_context() {
mut engine := Engine{
players: []
}
// Register a player that uses context
engine.register_player(fn (mut plbook playbook.PlayBook) ! {
// This player might access context variables
plbook.add_result('Context-aware execution')
}) or { panic('Failed to register context player: ${err}') }
actor := TestActor{
id: 'context_actor'
db_path: '/tmp/context_test.db'
engine: engine
}
// Create a job with specific context
test_job := job.Job{
id: 'context_job_1'
caller_id: 'context_caller'
context_id: 'context_123'
script: 'context_script'
script_type: .osis
status: .dispatched
// ... other fields with defaults
}
result := actor.run_job(test_job) or { panic('Failed to run context job: ${err}') }
assert result.len > 0
}
fn test_actor_process_job_success() {
mut e := Engine{
players: []
}
// Register a successful player
e.register_player(fn (mut plbook playbook.PlayBook) ! {
plbook.add_result('Success!')
}) or { panic('Failed to register success player: ${err}') }
actor := TestActor{
id: 'success_actor'
engine: e
}
// Create test job
test_job := job.new_job('success_caller', 'success_context', 'success script', .v)
// Process the job
actor.process_job(test_job) or { panic('Failed to process job: ${err}') }
// Process the job
actor.process_job(test_job, mut mock_redis) or { panic('Failed to process job: ${err}') }
// Verify Redis operations were called
assert mock_redis.operations.len > 0
// Check that status was updated to started and then finished
job_key := 'hero:job:${test_job.id}'
assert mock_redis.job_status[job_key] == 'finished'
assert mock_redis.job_results[job_key].len > 0
}
fn test_actor_process_job_error() {
mut engine := Engine{
players: []
}
// Register a failing player
engine.register_player(fn (mut plbook playbook.PlayBook) ! {
return error('Test error')
}) or { panic('Failed to register failing player: ${err}') }
actor := TestActor{
id: 'error_actor'
engine: engine
}
// Create test job
test_job := job.new_job('error_caller', 'error_context', 'error script', .v)
// Mock Redis connection
mut mock_redis := MockRedisConn{
operations: []
job_status: map[string]string{}
job_results: map[string]string{}
job_errors: map[string]string{}
}
// Process the job (should handle error gracefully)
if result := actor.process_job(test_job, mut mock_redis) {
panic('Expected job processing to fail')
}
// Verify error was recorded
job_key := 'hero:job:${test_job.id}'
assert mock_redis.job_status[job_key] == 'error'
assert mock_redis.job_errors[job_key].len > 0
}
fn test_multiple_actors() {
mut engine1 := Engine{ players: [] }
mut engine2 := Engine{ players: [] }
engine1.register_player(fn (mut plbook playbook.PlayBook) ! {
plbook.add_result('Actor 1 result')
}) or { panic('Failed to register player 1: ${err}') }
engine2.register_player(fn (mut plbook playbook.PlayBook) ! {
plbook.add_result('Actor 2 result')
}) or { panic('Failed to register player 2: ${err}') }
actor1 := TestActor{
id: 'actor_1'
engine: engine1
}
actor2 := TestActor{
id: 'actor_2'
engine: engine2
}
// Test that actors have different queue keys
queue1 := actor1.queue_key() or { panic('Failed to get queue key 1: ${err}') }
queue2 := actor2.queue_key() or { panic('Failed to get queue key 2: ${err}') }
assert queue1 != queue2
assert queue1.contains('actor_1')
assert queue2.contains('actor_2')
// Test that actors can run jobs independently
job1 := job.new_job('caller1', 'context1', 'script1', .v)
job2 := job.new_job('caller2', 'context2', 'script2', .osis)
result1 := actor1.run_job(job1) or { panic('Failed to run job 1: ${err}') }
result2 := actor2.run_job(job2) or { panic('Failed to run job 2: ${err}') }
assert result1.len > 0
assert result2.len > 0
}

7
v.mod Normal file
View File

@@ -0,0 +1,7 @@
Module {
name: 'runner'
description: 'hero runner in v'
version: '0.0.0'
license: 'MIT'
dependencies: []
}