Compare commits

...

2 Commits

Author SHA1 Message Date
Lee Smet
fde456fd5e Add new supervisor client over mycelium
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-27 14:17:58 +02:00
Lee Smet
a8227eb808 Revert "Add supervisor API"
This reverts commit bc30c9cc89.
2025-08-27 12:52:54 +02:00
10 changed files with 851 additions and 1083 deletions

751
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -14,12 +14,11 @@ redis = { version = "0.32.5", features = [
"connection-manager", "connection-manager",
"aio", "aio",
] } ] }
jsonrpsee = { version = "0.26.0", features = [ jsonrpsee = { version = "0.26.0", features = ["server", "macros"] }
"server",
"macros",
"client",
"http-client",
"ws-client",
] }
http = "1.3.1"
async-trait = "0.1.83" async-trait = "0.1.83"
# HTTP client to call Mycelium JSON-RPC
reqwest = { version = "0.12.7", features = ["json", "rustls-tls"] }
# Base64 encoding for message payloads
base64 = "0.22.1"
# Error derive for clean error types
thiserror = "1.0.64"

120
README.md
View File

@@ -1,122 +1,2 @@
# herocoordinator # herocoordinator
Supervisor JSON-RPC client
This crate now includes a typed client to communicate with an external "supervisor" component via JSON-RPC 2.0 over HTTP and WebSocket, generated from the OpenAPI spec in `specs/supervisor.yaml`.
Highlights
- Transports: HTTP and WebSocket (jsonrpsee).
- Session: optional bearer token support (Authorization header).
- Methods implemented: fetch_nonce, authenticate, whoami, play, create_job, start_job, run_job, get_job_status, get_job_output, get_job_logs, list_jobs, stop_job, delete_job, clear_all_jobs.
- Types mirror the spec exactly (enum casing etc.).
Enable features
jsonrpsee client features are enabled in Cargo.toml:
- server, macros, client, http-client, ws-client.
Usage
Add to your crate imports:
```rust
use herocoordinator::supervisor::{
SupervisorClient,
ScriptType,
JobParams,
};
```
Create an HTTP client (default http://127.0.0.1:9944/)
```rust
# #[tokio::main]
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = SupervisorClient::new_http("http://127.0.0.1:9944/").await?;
// Optional: obtain a token (out-of-band) and set it
// client.set_bearer_token("your-token").await?;
let pk = "abcdef1234deadbeef";
let nonce = client.fetch_nonce(pk).await?;
let ok = client.authenticate(pk, "signature-here", &nonce).await?;
assert!(ok, "authentication should succeed");
// whoami
let who = client.whoami().await?;
println!("whoami: {who}");
// play
let res = client.play(r#"echo "hello""#).await?;
println!("play.output: {}", res.output);
// create a job
let job_id = client
.create_job(JobParams {
script: r#"print("hi")"#.into(),
script_type: ScriptType::Python,
caller_id: "cli".into(),
context_id: "demo".into(),
timeout: Some(30),
prerequisites: Some(vec![]),
})
.await?;
println!("created job: {job_id}");
// start a job
let started = client.start_job(&job_id).await?;
println!("job started: {}", started.success);
// get status / output / logs
let status = client.get_job_status(&job_id).await?;
println!("job status: {:?}", status);
let output = client.get_job_output(&job_id).await?;
println!("job output: {output}");
let logs = client.get_job_logs(&job_id).await?;
println!("job logs: {:?}", logs.logs);
// list / stop / delete / clear
let jobs = client.list_jobs().await?;
println!("jobs: {:?}", jobs);
// stop and delete are noop if job is already finished (server-defined behavior)
let _ = client.stop_job(&job_id).await?;
let _ = client.delete_job(&job_id).await?;
// clear all jobs (use with care)
let _ = client.clear_all_jobs().await?;
# Ok(())
# }
```
Create a WebSocket client (default ws://127.0.0.1:9944/)
```rust
# #[tokio::main]
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = SupervisorClient::new_ws("ws://127.0.0.1:9944/").await?;
// Use the same methods as the HTTP client
let who = client.whoami().await?;
println!("whoami: {who}");
# Ok(())
# }
```
Notes on authenticate and tokens
- The OpenAPI spec defines authenticate returning a boolean. If your deployment provides a token via a header or another method, capture it externally and set it on the client using:
- `client.set_bearer_token("...").await?`
- To remove: `client.clear_bearer_token().await?`
Types
- Enums and DTOs mirror the OpenAPI casing:
- ScriptType: "OSIS" | "SAL" | "V" | "Python"
- JobStatus: "Dispatched" | "WaitingForPrerequisites" | "Started" | "Error" | "Finished"
- JobParams include: script, script_type, caller_id, context_id, timeout?, prerequisites?.
Testing
- Unit tests verify enum casing and request param shapes. No live server needed. Run: `cargo test`.
Files
- src/supervisor/mod.rs
- src/supervisor/types.rs
- src/supervisor/error.rs
- src/supervisor/client.rs

7
src/clients/mod.rs Normal file
View File

@@ -0,0 +1,7 @@
pub mod supervisor_client;
pub use supervisor_client::{
Destination,
SupervisorClient,
SupervisorClientError,
};

View File

@@ -0,0 +1,406 @@
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use reqwest::Client as HttpClient;
use serde_json::{Value, json};
use thiserror::Error;
/// Destination for Mycelium messages
#[derive(Clone, Debug)]
pub enum Destination {
Ip(IpAddr),
Pk(String), // 64-hex public key
}
#[derive(Clone)]
pub struct SupervisorClient {
base_url: String, // e.g. "http://127.0.0.1:8990"
destination: Destination, // ip or pk
topic: String, // e.g. "supervisor.rpc"
secret: Option<String>, // optional, required by several supervisor methods
http: HttpClient,
id_counter: Arc<AtomicU64>, // JSON-RPC id generator (for inner + outer requests)
}
#[derive(Debug, Error)]
pub enum SupervisorClientError {
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Transport timed out waiting for a reply (408)")]
TransportTimeout,
#[error("JSON-RPC error: {0}")]
RpcError(String),
#[error("Invalid response: {0}")]
InvalidResponse(String),
#[error("Missing secret for method requiring authentication")]
MissingSecret,
}
impl SupervisorClient {
/// Create a new client. base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty.
pub fn new(
base_url: impl Into<String>,
destination: Destination,
topic: impl Into<String>,
secret: Option<String>,
) -> Result<Self, SupervisorClientError> {
let mut url = base_url.into();
if url.is_empty() {
url = "http://127.0.0.1:8990".to_string();
}
let http = HttpClient::builder().build()?;
Ok(Self {
base_url: url,
destination,
topic: topic.into(),
secret,
http,
id_counter: Arc::new(AtomicU64::new(1)),
})
}
fn next_id(&self) -> u64 {
self.id_counter.fetch_add(1, Ordering::Relaxed)
}
fn build_dst(&self) -> Value {
match &self.destination {
Destination::Ip(ip) => json!({ "ip": ip.to_string() }),
Destination::Pk(pk) => json!({ "pk": pk }),
}
}
fn build_supervisor_payload(&self, method: &str, params: Value) -> Value {
json!({
"jsonrpc": "2.0",
"id": self.next_id(),
"method": method,
"params": params,
})
}
fn encode_payload(payload: &Value) -> Result<String, SupervisorClientError> {
let s = serde_json::to_string(payload)?;
Ok(BASE64_STANDARD.encode(s.as_bytes()))
}
fn build_push_request(&self, payload_b64: String) -> Value {
let dst = self.build_dst();
let msg = json!({
"dst": dst,
"topic": self.topic,
"payload": payload_b64
});
// Async path: no reply_timeout attached
json!({
"jsonrpc": "2.0",
"id": self.next_id(),
"method": "pushMessage",
"params": [ { "message": msg } ]
})
}
async fn send_push(&self, req: &Value) -> Result<Value, SupervisorClientError> {
let resp = self.http.post(&self.base_url).json(req).send().await?;
let status = resp.status();
let body: Value = resp.json().await?;
// JSON-RPC error object handling
if let Some(err) = body.get("error") {
let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
let msg = err
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
if code == 408 {
return Err(SupervisorClientError::TransportTimeout);
}
return Err(SupervisorClientError::RpcError(format!(
"code={code} msg={msg}"
)));
}
if !status.is_success() {
return Err(SupervisorClientError::RpcError(format!(
"HTTP status {status}, body {body}"
)));
}
Ok(body)
}
fn extract_message_id_from_result(result: &Value) -> Option<String> {
// Two possibilities per Mycelium spec oneOf:
// - PushMessageResponseId: { "id": "0123456789abcdef" }
// - InboundMessage: object containing "id" plus srcIp, ...; we still return id.
result
.get("id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
/// Generic call: build supervisor JSON-RPC message, wrap in Mycelium pushMessage, return outbound message id (hex).
pub async fn call(&self, method: &str, params: Value) -> Result<String, SupervisorClientError> {
let inner = self.build_supervisor_payload(method, params);
let payload_b64 = Self::encode_payload(&inner)?;
let push_req = self.build_push_request(payload_b64);
let resp = self.send_push(&push_req).await?;
// Expect "result" to be either inbound message or response id
match resp.get("result") {
Some(res_obj) => {
if let Some(id) = Self::extract_message_id_from_result(res_obj) {
return Ok(id);
}
// Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient)
if let Some(arr) = res_obj.as_array()
&& arr.len() == 1
&& let Some(id) = Self::extract_message_id_from_result(&arr[0])
{
return Ok(id);
}
Err(SupervisorClientError::InvalidResponse(format!(
"result did not contain message id: {res_obj}"
)))
}
None => Err(SupervisorClientError::InvalidResponse(format!(
"missing result in response: {resp}"
))),
}
}
fn need_secret(&self) -> Result<&str, SupervisorClientError> {
self.secret
.as_deref()
.ok_or(SupervisorClientError::MissingSecret)
}
// -----------------------------
// Typed wrappers for Supervisor API
// Asynchronous-only: returns outbound message id
// -----------------------------
// Runners
pub async fn list_runners(&self) -> Result<String, SupervisorClientError> {
self.call("list_runners", json!([])).await
}
pub async fn register_runner(
&self,
name: impl Into<String>,
queue: impl Into<String>,
) -> Result<String, SupervisorClientError> {
let secret = self.need_secret()?;
let params = json!([{
"secret": secret,
"name": name.into(),
"queue": queue.into()
}]);
self.call("register_runner", params).await
}
pub async fn remove_runner(
&self,
actor_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
self.call("remove_runner", json!([actor_id.into()])).await
}
pub async fn start_runner(
&self,
actor_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
self.call("start_runner", json!([actor_id.into()])).await
}
pub async fn stop_runner(
&self,
actor_id: impl Into<String>,
force: bool,
) -> Result<String, SupervisorClientError> {
self.call("stop_runner", json!([actor_id.into(), force]))
.await
}
pub async fn get_runner_status(
&self,
actor_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
self.call("get_runner_status", json!([actor_id.into()]))
.await
}
pub async fn get_all_runner_status(&self) -> Result<String, SupervisorClientError> {
self.call("get_all_runner_status", json!([])).await
}
pub async fn start_all(&self) -> Result<String, SupervisorClientError> {
self.call("start_all", json!([])).await
}
pub async fn stop_all(&self, force: bool) -> Result<String, SupervisorClientError> {
self.call("stop_all", json!([force])).await
}
pub async fn get_all_status(&self) -> Result<String, SupervisorClientError> {
self.call("get_all_status", json!([])).await
}
// Jobs
pub async fn jobs_create(&self, job: Value) -> Result<String, SupervisorClientError> {
let secret = self.need_secret()?;
let params = json!([{
"secret": secret,
"job": job
}]);
self.call("jobs.create", params).await
}
pub async fn jobs_list(&self) -> Result<String, SupervisorClientError> {
self.call("jobs.list", json!([])).await
}
pub async fn job_run(&self, job: Value) -> Result<String, SupervisorClientError> {
let secret = self.need_secret()?;
let params = json!([{
"secret": secret,
"job": job
}]);
self.call("job.run", params).await
}
pub async fn job_start(
&self,
job_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
let secret = self.need_secret()?;
let params = json!([{
"secret": secret,
"job_id": job_id.into()
}]);
self.call("job.start", params).await
}
pub async fn job_status(
&self,
job_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
self.call("job.status", json!([job_id.into()])).await
}
pub async fn job_result(
&self,
job_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
self.call("job.result", json!([job_id.into()])).await
}
pub async fn job_stop(
&self,
job_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
let secret = self.need_secret()?;
let params = json!([{
"secret": secret,
"job_id": job_id.into()
}]);
self.call("job.stop", params).await
}
pub async fn job_delete(
&self,
job_id: impl Into<String>,
) -> Result<String, SupervisorClientError> {
let secret = self.need_secret()?;
let params = json!([{
"secret": secret,
"job_id": job_id.into()
}]);
self.call("job.delete", params).await
}
// Discovery
pub async fn rpc_discover(&self) -> Result<String, SupervisorClientError> {
self.call("rpc.discover", json!([])).await
}
}
// -----------------------------
// Tests (serialization-only)
// -----------------------------
#[cfg(test)]
mod tests {
use super::*;
fn mk_client() -> SupervisorClient {
SupervisorClient::new(
"http://127.0.0.1:8990",
Destination::Pk(
"bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".to_string(),
),
"supervisor.rpc",
Some("secret".to_string()),
)
.unwrap()
}
#[test]
fn builds_dst_ip_and_pk() {
let c_ip = SupervisorClient::new(
"http://127.0.0.1:8990",
Destination::Ip("2001:db8::1".parse().unwrap()),
"supervisor.rpc",
None,
)
.unwrap();
let v_ip = c_ip.build_dst();
assert_eq!(v_ip.get("ip").unwrap().as_str().unwrap(), "2001:db8::1");
let c_pk = mk_client();
let v_pk = c_pk.build_dst();
assert_eq!(
v_pk.get("pk").unwrap().as_str().unwrap(),
"bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32"
);
}
#[test]
fn wraps_supervisor_payload_in_push_message() {
let c = mk_client();
let payload = c.build_supervisor_payload("list_runners", json!([]));
let b64 = SupervisorClient::encode_payload(&payload).unwrap();
let req = c.build_push_request(b64);
assert_eq!(req.get("method").unwrap().as_str().unwrap(), "pushMessage");
let params = req.get("params").unwrap().as_array().unwrap();
let msg = params[0].get("message").unwrap();
assert!(msg.get("payload").is_some());
assert_eq!(
msg.get("topic").unwrap().as_str().unwrap(),
"supervisor.rpc"
);
assert!(msg.get("dst").unwrap().get("pk").is_some());
}
#[test]
fn extract_message_id_works_for_both_variants() {
// PushMessageResponseId
let r1 = json!({"id":"0123456789abcdef"});
assert_eq!(
SupervisorClient::extract_message_id_from_result(&r1).unwrap(),
"0123456789abcdef"
);
// InboundMessage-like
let r2 = json!({
"id":"fedcba9876543210",
"srcIp":"449:abcd:0123:defa::1",
"payload":"hpV+"
});
assert_eq!(
SupervisorClient::extract_message_id_from_result(&r2).unwrap(),
"fedcba9876543210"
);
}
}

View File

@@ -4,4 +4,4 @@ pub mod service;
mod time; mod time;
pub mod dag; pub mod dag;
pub mod rpc; pub mod rpc;
pub mod supervisor; pub mod clients;

View File

@@ -1,18 +0,0 @@
// Public Supervisor client module entrypoint.
//
// Exposes:
// - SupervisorClient: high-level JSON-RPC client (HTTP/WS)
// - SupervisorError: typed error for transport and JSON-RPC envelopes
// - types::*: external DTOs and enums matching specs/supervisor.yaml
//
// Default endpoints (configurable at construction):
// - HTTP: http://127.0.0.1:9944/
// - WS: ws://127.0.0.1:9944/
pub mod types;
pub mod error;
pub mod client;
pub use client::SupervisorClient;
pub use error::SupervisorError;
pub use types::*;

View File

@@ -1,388 +0,0 @@
use crate::supervisor::error::SupervisorError;
use crate::supervisor::types::*;
use http::header::{AUTHORIZATION, CONTENT_TYPE};
use http::{HeaderMap, HeaderValue};
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::params::{ArrayParams, ObjectParams};
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
#[derive(Debug)]
enum TransportClient {
Http(HttpClient),
Ws(WsClient),
}
/// High-level JSON-RPC client for the Supervisor (HTTP/WS).
///
/// Defaults:
/// - HTTP: http://127.0.0.1:9944/
/// - WS: ws://127.0.0.1:9944/
pub struct SupervisorClient {
base_url: String,
bearer_token: Option<String>,
inner: TransportClient,
}
impl SupervisorClient {
// -----------------------------
// Constructors
// -----------------------------
pub async fn new_http(base_url: impl Into<String>) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let inner = Self::build_http(&base_url, None).await?;
Ok(Self {
base_url,
bearer_token: None,
inner: TransportClient::Http(inner),
})
}
pub async fn new_ws(base_url: impl Into<String>) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let inner = Self::build_ws(&base_url, None).await?;
Ok(Self {
base_url,
bearer_token: None,
inner: TransportClient::Ws(inner),
})
}
pub async fn new_http_with_token(
base_url: impl Into<String>,
token: impl Into<String>,
) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let token = Some(token.into());
let inner = Self::build_http(&base_url, token.clone()).await?;
Ok(Self {
base_url,
bearer_token: token,
inner: TransportClient::Http(inner),
})
}
pub async fn new_ws_with_token(
base_url: impl Into<String>,
token: impl Into<String>,
) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let token = Some(token.into());
let inner = Self::build_ws(&base_url, token.clone()).await?;
Ok(Self {
base_url,
bearer_token: token,
inner: TransportClient::Ws(inner),
})
}
// -----------------------------
// Session management
// -----------------------------
/// Set or replace the bearer token and rebuild the underlying client with auth header.
pub async fn set_bearer_token(&mut self, token: impl Into<String>) -> Result<(), SupervisorError> {
let token = token.into();
self.bearer_token = Some(token);
self.rebuild().await
}
/// Remove the bearer token and rebuild the client without auth header.
pub async fn clear_bearer_token(&mut self) -> Result<(), SupervisorError> {
self.bearer_token = None;
self.rebuild().await
}
async fn rebuild(&mut self) -> Result<(), SupervisorError> {
let token = self.bearer_token.clone();
self.inner = match self.inner {
TransportClient::Http(_) => TransportClient::Http(Self::build_http(&self.base_url, token).await?),
TransportClient::Ws(_) => TransportClient::Ws(Self::build_ws(&self.base_url, token).await?),
};
Ok(())
}
// -----------------------------
// Internal builders
// -----------------------------
async fn build_http(base_url: &str, bearer: Option<String>) -> Result<HttpClient, SupervisorError> {
let mut headers = HeaderMap::new();
// jsonrpsee sets JSON content by default; being explicit doesn't hurt.
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/json; charset=utf-8"),
);
if let Some(token) = bearer {
let val = format!("Bearer {token}");
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&val).map_err(|e| SupervisorError::Transport(e.to_string()))?,
);
}
let client = HttpClientBuilder::default()
.set_headers(headers)
.build(base_url)
.map_err(|e| SupervisorError::Transport(e.to_string()))?;
Ok(client)
}
async fn build_ws(base_url: &str, bearer: Option<String>) -> Result<WsClient, SupervisorError> {
let mut headers = HeaderMap::new();
if let Some(token) = bearer {
let val = format!("Bearer {token}");
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&val).map_err(|e| SupervisorError::Transport(e.to_string()))?,
);
}
let client = WsClientBuilder::default()
.set_headers(headers)
.build(base_url)
.await
.map_err(|e| SupervisorError::Transport(e.to_string()))?;
Ok(client)
}
// -----------------------------
// Request helper (named-object params)
// -----------------------------
async fn request<R, P>(&self, method: &str, params: P) -> Result<R, SupervisorError>
where
R: DeserializeOwned,
P: Serialize,
{
let value = serde_json::to_value(params).map_err(SupervisorError::from)?;
match &self.inner {
TransportClient::Http(c) => {
match value {
Value::Object(map) => {
let mut obj = ObjectParams::new();
for (k, v) in map {
obj.insert(k.as_str(), v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, obj).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
Value::Array(arr) => {
let mut ap = ArrayParams::new();
for v in arr {
ap.insert(v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
other => {
let mut ap = ArrayParams::new();
ap.insert(other).map_err(|e| SupervisorError::Transport(e.to_string()))?;
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
}
}
TransportClient::Ws(c) => {
match value {
Value::Object(map) => {
let mut obj = ObjectParams::new();
for (k, v) in map {
obj.insert(k.as_str(), v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, obj).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
Value::Array(arr) => {
let mut ap = ArrayParams::new();
for v in arr {
ap.insert(v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
other => {
let mut ap = ArrayParams::new();
ap.insert(other).map_err(|e| SupervisorError::Transport(e.to_string()))?;
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
}
}
}
}
// -----------------------------
// API methods (specs/supervisor.yaml)
// -----------------------------
/// fetch_nonce(public_key) -> String
pub async fn fetch_nonce(&self, public_key: &str) -> Result<String, SupervisorError> {
let params = FetchNonceParams {
public_key: public_key.to_owned(),
};
self.request("fetch_nonce", params).await
}
/// authenticate(public_key, signature, nonce) -> bool
///
/// Note: If your deployment provides a token out-of-band (e.g. header or subsequent call),
/// use set_bearer_token after obtaining it.
pub async fn authenticate(
&self,
public_key: &str,
signature: &str,
nonce: &str,
) -> Result<bool, SupervisorError> {
let params = AuthenticateParams {
public_key: public_key.to_owned(),
signature: signature.to_owned(),
nonce: nonce.to_owned(),
};
self.request("authenticate", params).await
}
/// whoami() -> String (JSON string with auth info)
pub async fn whoami(&self) -> Result<String, SupervisorError> {
let params = WhoAmIParams {};
self.request("whoami", params).await
}
/// play(script) -> PlayResult
pub async fn play(&self, script: &str) -> Result<PlayResult, SupervisorError> {
let params = PlayParams {
script: script.to_owned(),
};
self.request("play", params).await
}
/// create_job(job_params) -> String (job id)
pub async fn create_job(&self, job_params: JobParams) -> Result<String, SupervisorError> {
let params = CreateJobParams { job_params };
self.request("create_job", params).await
}
/// start_job(job_id) -> StartJobResult
pub async fn start_job(&self, job_id: &str) -> Result<StartJobResult, SupervisorError> {
let params = StartJobParams {
job_id: job_id.to_owned(),
};
self.request("start_job", params).await
}
/// run_job(script, script_type, prerequisites?) -> String (output)
pub async fn run_job(
&self,
script: String,
script_type: ScriptType,
prerequisites: Option<Vec<String>>,
) -> Result<String, SupervisorError> {
let params = RunJobParams {
script,
script_type,
prerequisites,
};
self.request("run_job", params).await
}
/// get_job_status(job_id) -> JobStatus
pub async fn get_job_status(&self, job_id: &str) -> Result<JobStatus, SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("get_job_status", params).await
}
/// get_job_output(job_id) -> String
pub async fn get_job_output(&self, job_id: &str) -> Result<String, SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("get_job_output", params).await
}
/// get_job_logs(job_id) -> JobLogsResult
pub async fn get_job_logs(&self, job_id: &str) -> Result<JobLogsResult, SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("get_job_logs", params).await
}
/// list_jobs() -> Vec<String>
pub async fn list_jobs(&self) -> Result<Vec<String>, SupervisorError> {
let params = ListJobsParams {};
self.request("list_jobs", params).await
}
/// stop_job(job_id) -> null (maps to ())
pub async fn stop_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
// jsonrpsee will deserialize JSON null into unit type when expected.
self.request("stop_job", params).await
}
/// delete_job(job_id) -> null (maps to ())
pub async fn delete_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("delete_job", params).await
}
/// clear_all_jobs() -> null (maps to ())
pub async fn clear_all_jobs(&self) -> Result<(), SupervisorError> {
let params = ClearAllJobsParams {};
self.request("clear_all_jobs", params).await
}
}
// Basic compilation-only tests for method signatures and param serialization.
#[cfg(test)]
mod tests {
use super::*;
use serde_json::to_value;
#[test]
fn params_shapes() {
let _ = to_value(FetchNonceParams {
public_key: "abc".into(),
})
.unwrap();
let _ = to_value(AuthenticateParams {
public_key: "abc".into(),
signature: "sig".into(),
nonce: "nonce".into(),
})
.unwrap();
let _ = to_value(PlayParams {
script: "echo hi".into(),
})
.unwrap();
let _ = to_value(CreateJobParams {
job_params: JobParams {
script: "echo hi".into(),
script_type: ScriptType::SAL,
caller_id: "cli".into(),
context_id: "demo".into(),
timeout: Some(30),
prerequisites: Some(vec![]),
},
})
.unwrap();
let _ = to_value(StartJobParams {
job_id: "jid".into(),
})
.unwrap();
let _ = to_value(RunJobParams {
script: "print(1)".into(),
script_type: ScriptType::Python,
prerequisites: None,
})
.unwrap();
let _ = to_value(GetJobIdParams {
job_id: "jid".into(),
})
.unwrap();
let _ = to_value(ListJobsParams {}).unwrap();
let _ = to_value(ClearAllJobsParams {}).unwrap();
}
}

View File

@@ -1,37 +0,0 @@
use serde_json::Value;
use std::fmt::{Display, Formatter};
#[derive(Debug)]
pub enum SupervisorError {
Transport(String),
Rpc { code: i32, message: String, data: Option<Value> },
Serde(serde_json::Error),
}
impl Display for SupervisorError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SupervisorError::Transport(e) => write!(f, "transport error: {e}"),
SupervisorError::Rpc { code, message, .. } => write!(f, "rpc error {code}: {message}"),
SupervisorError::Serde(e) => write!(f, "serde error: {e}"),
}
}
}
impl std::error::Error for SupervisorError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
SupervisorError::Serde(e) => Some(e),
_ => None,
}
}
}
impl From<serde_json::Error> for SupervisorError {
fn from(e: serde_json::Error) -> Self {
SupervisorError::Serde(e)
}
}
// Note: jsonrpsee error types differ across transports/versions; we avoid a blanket `From` impl.
// Callers map transport errors to SupervisorError::Transport at the callsite to reduce coupling.

View File

@@ -1,190 +0,0 @@
use serde::{Deserialize, Serialize};
// Types mirroring specs/supervisor.yaml exactly (wire format)
// Enums
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScriptType {
#[serde(rename = "OSIS")]
OSIS,
#[serde(rename = "SAL")]
SAL,
#[serde(rename = "V")]
V,
#[serde(rename = "Python")]
Python,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum JobStatus {
#[serde(rename = "Dispatched")]
Dispatched,
#[serde(rename = "WaitingForPrerequisites")]
WaitingForPrerequisites,
#[serde(rename = "Started")]
Started,
#[serde(rename = "Error")]
Error,
#[serde(rename = "Finished")]
Finished,
}
// DTOs
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobParams {
pub script: String,
pub script_type: ScriptType,
pub caller_id: String,
pub context_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub timeout: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub prerequisites: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayResult {
pub output: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartJobResult {
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobLogsResult {
pub logs: Option<String>,
}
// Params (named-object) per method
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FetchNonceParams {
pub public_key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthenticateParams {
pub public_key: String,
pub signature: String,
pub nonce: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WhoAmIParams {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayParams {
pub script: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateJobParams {
pub job_params: JobParams,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartJobParams {
pub job_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunJobParams {
pub script: String,
pub script_type: ScriptType,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub prerequisites: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetJobIdParams {
pub job_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ListJobsParams {}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClearAllJobsParams {}
// Conversions to/from internal models when convenient
impl From<crate::models::ScriptType> for ScriptType {
fn from(v: crate::models::ScriptType) -> Self {
match v {
crate::models::ScriptType::Osis => ScriptType::OSIS,
crate::models::ScriptType::Sal => ScriptType::SAL,
crate::models::ScriptType::V => ScriptType::V,
crate::models::ScriptType::Python => ScriptType::Python,
}
}
}
impl From<ScriptType> for crate::models::ScriptType {
fn from(v: ScriptType) -> Self {
match v {
ScriptType::OSIS => crate::models::ScriptType::Osis,
ScriptType::SAL => crate::models::ScriptType::Sal,
ScriptType::V => crate::models::ScriptType::V,
ScriptType::Python => crate::models::ScriptType::Python,
}
}
}
impl From<crate::models::JobStatus> for JobStatus {
fn from(v: crate::models::JobStatus) -> Self {
match v {
crate::models::JobStatus::Dispatched => JobStatus::Dispatched,
crate::models::JobStatus::WaitingForPrerequisites => JobStatus::WaitingForPrerequisites,
crate::models::JobStatus::Started => JobStatus::Started,
crate::models::JobStatus::Error => JobStatus::Error,
crate::models::JobStatus::Finished => JobStatus::Finished,
}
}
}
impl From<JobStatus> for crate::models::JobStatus {
fn from(v: JobStatus) -> Self {
match v {
JobStatus::Dispatched => crate::models::JobStatus::Dispatched,
JobStatus::WaitingForPrerequisites => crate::models::JobStatus::WaitingForPrerequisites,
JobStatus::Started => crate::models::JobStatus::Started,
JobStatus::Error => crate::models::JobStatus::Error,
JobStatus::Finished => crate::models::JobStatus::Finished,
}
}
}
// Basic serialization tests (casing)
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{json, to_value};
#[test]
fn script_type_casing() {
assert_eq!(to_value(&ScriptType::OSIS).unwrap(), json!("OSIS"));
assert_eq!(to_value(&ScriptType::SAL).unwrap(), json!("SAL"));
assert_eq!(to_value(&ScriptType::V).unwrap(), json!("V"));
assert_eq!(to_value(&ScriptType::Python).unwrap(), json!("Python"));
}
#[test]
fn job_status_casing() {
assert_eq!(to_value(&JobStatus::Dispatched).unwrap(), json!("Dispatched"));
assert_eq!(
to_value(&JobStatus::WaitingForPrerequisites).unwrap(),
json!("WaitingForPrerequisites")
);
assert_eq!(to_value(&JobStatus::Started).unwrap(), json!("Started"));
assert_eq!(to_value(&JobStatus::Error).unwrap(), json!("Error"));
assert_eq!(to_value(&JobStatus::Finished).unwrap(), json!("Finished"));
}
}