Add supervisor API

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-22 23:19:05 +02:00
parent 7ce19f8b6d
commit bc30c9cc89
8 changed files with 1412 additions and 12 deletions

View File

@@ -4,3 +4,4 @@ pub mod service;
mod time;
pub mod dag;
pub mod rpc;
pub mod supervisor;

18
src/supervisor.rs Normal file
View File

@@ -0,0 +1,18 @@
// Public Supervisor client module entrypoint.
//
// Exposes:
// - SupervisorClient: high-level JSON-RPC client (HTTP/WS)
// - SupervisorError: typed error for transport and JSON-RPC envelopes
// - types::*: external DTOs and enums matching specs/supervisor.yaml
//
// Default endpoints (configurable at construction):
// - HTTP: http://127.0.0.1:9944/
// - WS: ws://127.0.0.1:9944/
pub mod types;
pub mod error;
pub mod client;
pub use client::SupervisorClient;
pub use error::SupervisorError;
pub use types::*;

388
src/supervisor/client.rs Normal file
View File

@@ -0,0 +1,388 @@
use crate::supervisor::error::SupervisorError;
use crate::supervisor::types::*;
use http::header::{AUTHORIZATION, CONTENT_TYPE};
use http::{HeaderMap, HeaderValue};
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::params::{ArrayParams, ObjectParams};
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
#[derive(Debug)]
enum TransportClient {
Http(HttpClient),
Ws(WsClient),
}
/// High-level JSON-RPC client for the Supervisor (HTTP/WS).
///
/// Defaults:
/// - HTTP: http://127.0.0.1:9944/
/// - WS: ws://127.0.0.1:9944/
pub struct SupervisorClient {
base_url: String,
bearer_token: Option<String>,
inner: TransportClient,
}
impl SupervisorClient {
// -----------------------------
// Constructors
// -----------------------------
pub async fn new_http(base_url: impl Into<String>) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let inner = Self::build_http(&base_url, None).await?;
Ok(Self {
base_url,
bearer_token: None,
inner: TransportClient::Http(inner),
})
}
pub async fn new_ws(base_url: impl Into<String>) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let inner = Self::build_ws(&base_url, None).await?;
Ok(Self {
base_url,
bearer_token: None,
inner: TransportClient::Ws(inner),
})
}
pub async fn new_http_with_token(
base_url: impl Into<String>,
token: impl Into<String>,
) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let token = Some(token.into());
let inner = Self::build_http(&base_url, token.clone()).await?;
Ok(Self {
base_url,
bearer_token: token,
inner: TransportClient::Http(inner),
})
}
pub async fn new_ws_with_token(
base_url: impl Into<String>,
token: impl Into<String>,
) -> Result<Self, SupervisorError> {
let base_url = base_url.into();
let token = Some(token.into());
let inner = Self::build_ws(&base_url, token.clone()).await?;
Ok(Self {
base_url,
bearer_token: token,
inner: TransportClient::Ws(inner),
})
}
// -----------------------------
// Session management
// -----------------------------
/// Set or replace the bearer token and rebuild the underlying client with auth header.
pub async fn set_bearer_token(&mut self, token: impl Into<String>) -> Result<(), SupervisorError> {
let token = token.into();
self.bearer_token = Some(token);
self.rebuild().await
}
/// Remove the bearer token and rebuild the client without auth header.
pub async fn clear_bearer_token(&mut self) -> Result<(), SupervisorError> {
self.bearer_token = None;
self.rebuild().await
}
async fn rebuild(&mut self) -> Result<(), SupervisorError> {
let token = self.bearer_token.clone();
self.inner = match self.inner {
TransportClient::Http(_) => TransportClient::Http(Self::build_http(&self.base_url, token).await?),
TransportClient::Ws(_) => TransportClient::Ws(Self::build_ws(&self.base_url, token).await?),
};
Ok(())
}
// -----------------------------
// Internal builders
// -----------------------------
async fn build_http(base_url: &str, bearer: Option<String>) -> Result<HttpClient, SupervisorError> {
let mut headers = HeaderMap::new();
// jsonrpsee sets JSON content by default; being explicit doesn't hurt.
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/json; charset=utf-8"),
);
if let Some(token) = bearer {
let val = format!("Bearer {token}");
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&val).map_err(|e| SupervisorError::Transport(e.to_string()))?,
);
}
let client = HttpClientBuilder::default()
.set_headers(headers)
.build(base_url)
.map_err(|e| SupervisorError::Transport(e.to_string()))?;
Ok(client)
}
async fn build_ws(base_url: &str, bearer: Option<String>) -> Result<WsClient, SupervisorError> {
let mut headers = HeaderMap::new();
if let Some(token) = bearer {
let val = format!("Bearer {token}");
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&val).map_err(|e| SupervisorError::Transport(e.to_string()))?,
);
}
let client = WsClientBuilder::default()
.set_headers(headers)
.build(base_url)
.await
.map_err(|e| SupervisorError::Transport(e.to_string()))?;
Ok(client)
}
// -----------------------------
// Request helper (named-object params)
// -----------------------------
async fn request<R, P>(&self, method: &str, params: P) -> Result<R, SupervisorError>
where
R: DeserializeOwned,
P: Serialize,
{
let value = serde_json::to_value(params).map_err(SupervisorError::from)?;
match &self.inner {
TransportClient::Http(c) => {
match value {
Value::Object(map) => {
let mut obj = ObjectParams::new();
for (k, v) in map {
obj.insert(k.as_str(), v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, obj).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
Value::Array(arr) => {
let mut ap = ArrayParams::new();
for v in arr {
ap.insert(v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
other => {
let mut ap = ArrayParams::new();
ap.insert(other).map_err(|e| SupervisorError::Transport(e.to_string()))?;
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
}
}
TransportClient::Ws(c) => {
match value {
Value::Object(map) => {
let mut obj = ObjectParams::new();
for (k, v) in map {
obj.insert(k.as_str(), v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, obj).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
Value::Array(arr) => {
let mut ap = ArrayParams::new();
for v in arr {
ap.insert(v).map_err(|e| SupervisorError::Transport(e.to_string()))?;
}
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
other => {
let mut ap = ArrayParams::new();
ap.insert(other).map_err(|e| SupervisorError::Transport(e.to_string()))?;
c.request(method, ap).await.map_err(|e| SupervisorError::Transport(e.to_string()))
}
}
}
}
}
// -----------------------------
// API methods (specs/supervisor.yaml)
// -----------------------------
/// fetch_nonce(public_key) -> String
pub async fn fetch_nonce(&self, public_key: &str) -> Result<String, SupervisorError> {
let params = FetchNonceParams {
public_key: public_key.to_owned(),
};
self.request("fetch_nonce", params).await
}
/// authenticate(public_key, signature, nonce) -> bool
///
/// Note: If your deployment provides a token out-of-band (e.g. header or subsequent call),
/// use set_bearer_token after obtaining it.
pub async fn authenticate(
&self,
public_key: &str,
signature: &str,
nonce: &str,
) -> Result<bool, SupervisorError> {
let params = AuthenticateParams {
public_key: public_key.to_owned(),
signature: signature.to_owned(),
nonce: nonce.to_owned(),
};
self.request("authenticate", params).await
}
/// whoami() -> String (JSON string with auth info)
pub async fn whoami(&self) -> Result<String, SupervisorError> {
let params = WhoAmIParams {};
self.request("whoami", params).await
}
/// play(script) -> PlayResult
pub async fn play(&self, script: &str) -> Result<PlayResult, SupervisorError> {
let params = PlayParams {
script: script.to_owned(),
};
self.request("play", params).await
}
/// create_job(job_params) -> String (job id)
pub async fn create_job(&self, job_params: JobParams) -> Result<String, SupervisorError> {
let params = CreateJobParams { job_params };
self.request("create_job", params).await
}
/// start_job(job_id) -> StartJobResult
pub async fn start_job(&self, job_id: &str) -> Result<StartJobResult, SupervisorError> {
let params = StartJobParams {
job_id: job_id.to_owned(),
};
self.request("start_job", params).await
}
/// run_job(script, script_type, prerequisites?) -> String (output)
pub async fn run_job(
&self,
script: String,
script_type: ScriptType,
prerequisites: Option<Vec<String>>,
) -> Result<String, SupervisorError> {
let params = RunJobParams {
script,
script_type,
prerequisites,
};
self.request("run_job", params).await
}
/// get_job_status(job_id) -> JobStatus
pub async fn get_job_status(&self, job_id: &str) -> Result<JobStatus, SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("get_job_status", params).await
}
/// get_job_output(job_id) -> String
pub async fn get_job_output(&self, job_id: &str) -> Result<String, SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("get_job_output", params).await
}
/// get_job_logs(job_id) -> JobLogsResult
pub async fn get_job_logs(&self, job_id: &str) -> Result<JobLogsResult, SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("get_job_logs", params).await
}
/// list_jobs() -> Vec<String>
pub async fn list_jobs(&self) -> Result<Vec<String>, SupervisorError> {
let params = ListJobsParams {};
self.request("list_jobs", params).await
}
/// stop_job(job_id) -> null (maps to ())
pub async fn stop_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
// jsonrpsee will deserialize JSON null into unit type when expected.
self.request("stop_job", params).await
}
/// delete_job(job_id) -> null (maps to ())
pub async fn delete_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let params = GetJobIdParams {
job_id: job_id.to_owned(),
};
self.request("delete_job", params).await
}
/// clear_all_jobs() -> null (maps to ())
pub async fn clear_all_jobs(&self) -> Result<(), SupervisorError> {
let params = ClearAllJobsParams {};
self.request("clear_all_jobs", params).await
}
}
// Basic compilation-only tests for method signatures and param serialization.
#[cfg(test)]
mod tests {
use super::*;
use serde_json::to_value;
#[test]
fn params_shapes() {
let _ = to_value(FetchNonceParams {
public_key: "abc".into(),
})
.unwrap();
let _ = to_value(AuthenticateParams {
public_key: "abc".into(),
signature: "sig".into(),
nonce: "nonce".into(),
})
.unwrap();
let _ = to_value(PlayParams {
script: "echo hi".into(),
})
.unwrap();
let _ = to_value(CreateJobParams {
job_params: JobParams {
script: "echo hi".into(),
script_type: ScriptType::SAL,
caller_id: "cli".into(),
context_id: "demo".into(),
timeout: Some(30),
prerequisites: Some(vec![]),
},
})
.unwrap();
let _ = to_value(StartJobParams {
job_id: "jid".into(),
})
.unwrap();
let _ = to_value(RunJobParams {
script: "print(1)".into(),
script_type: ScriptType::Python,
prerequisites: None,
})
.unwrap();
let _ = to_value(GetJobIdParams {
job_id: "jid".into(),
})
.unwrap();
let _ = to_value(ListJobsParams {}).unwrap();
let _ = to_value(ClearAllJobsParams {}).unwrap();
}
}

37
src/supervisor/error.rs Normal file
View File

@@ -0,0 +1,37 @@
use serde_json::Value;
use std::fmt::{Display, Formatter};
#[derive(Debug)]
pub enum SupervisorError {
Transport(String),
Rpc { code: i32, message: String, data: Option<Value> },
Serde(serde_json::Error),
}
impl Display for SupervisorError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SupervisorError::Transport(e) => write!(f, "transport error: {e}"),
SupervisorError::Rpc { code, message, .. } => write!(f, "rpc error {code}: {message}"),
SupervisorError::Serde(e) => write!(f, "serde error: {e}"),
}
}
}
impl std::error::Error for SupervisorError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
SupervisorError::Serde(e) => Some(e),
_ => None,
}
}
}
impl From<serde_json::Error> for SupervisorError {
fn from(e: serde_json::Error) -> Self {
SupervisorError::Serde(e)
}
}
// Note: jsonrpsee error types differ across transports/versions; we avoid a blanket `From` impl.
// Callers map transport errors to SupervisorError::Transport at the callsite to reduce coupling.

190
src/supervisor/types.rs Normal file
View File

@@ -0,0 +1,190 @@
use serde::{Deserialize, Serialize};
// Types mirroring specs/supervisor.yaml exactly (wire format)
// Enums
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScriptType {
#[serde(rename = "OSIS")]
OSIS,
#[serde(rename = "SAL")]
SAL,
#[serde(rename = "V")]
V,
#[serde(rename = "Python")]
Python,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum JobStatus {
#[serde(rename = "Dispatched")]
Dispatched,
#[serde(rename = "WaitingForPrerequisites")]
WaitingForPrerequisites,
#[serde(rename = "Started")]
Started,
#[serde(rename = "Error")]
Error,
#[serde(rename = "Finished")]
Finished,
}
// DTOs
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobParams {
pub script: String,
pub script_type: ScriptType,
pub caller_id: String,
pub context_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub timeout: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub prerequisites: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayResult {
pub output: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartJobResult {
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobLogsResult {
pub logs: Option<String>,
}
// Params (named-object) per method
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FetchNonceParams {
pub public_key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthenticateParams {
pub public_key: String,
pub signature: String,
pub nonce: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WhoAmIParams {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayParams {
pub script: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateJobParams {
pub job_params: JobParams,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartJobParams {
pub job_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunJobParams {
pub script: String,
pub script_type: ScriptType,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub prerequisites: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetJobIdParams {
pub job_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ListJobsParams {}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClearAllJobsParams {}
// Conversions to/from internal models when convenient
impl From<crate::models::ScriptType> for ScriptType {
fn from(v: crate::models::ScriptType) -> Self {
match v {
crate::models::ScriptType::Osis => ScriptType::OSIS,
crate::models::ScriptType::Sal => ScriptType::SAL,
crate::models::ScriptType::V => ScriptType::V,
crate::models::ScriptType::Python => ScriptType::Python,
}
}
}
impl From<ScriptType> for crate::models::ScriptType {
fn from(v: ScriptType) -> Self {
match v {
ScriptType::OSIS => crate::models::ScriptType::Osis,
ScriptType::SAL => crate::models::ScriptType::Sal,
ScriptType::V => crate::models::ScriptType::V,
ScriptType::Python => crate::models::ScriptType::Python,
}
}
}
impl From<crate::models::JobStatus> for JobStatus {
fn from(v: crate::models::JobStatus) -> Self {
match v {
crate::models::JobStatus::Dispatched => JobStatus::Dispatched,
crate::models::JobStatus::WaitingForPrerequisites => JobStatus::WaitingForPrerequisites,
crate::models::JobStatus::Started => JobStatus::Started,
crate::models::JobStatus::Error => JobStatus::Error,
crate::models::JobStatus::Finished => JobStatus::Finished,
}
}
}
impl From<JobStatus> for crate::models::JobStatus {
fn from(v: JobStatus) -> Self {
match v {
JobStatus::Dispatched => crate::models::JobStatus::Dispatched,
JobStatus::WaitingForPrerequisites => crate::models::JobStatus::WaitingForPrerequisites,
JobStatus::Started => crate::models::JobStatus::Started,
JobStatus::Error => crate::models::JobStatus::Error,
JobStatus::Finished => crate::models::JobStatus::Finished,
}
}
}
// Basic serialization tests (casing)
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{json, to_value};
#[test]
fn script_type_casing() {
assert_eq!(to_value(&ScriptType::OSIS).unwrap(), json!("OSIS"));
assert_eq!(to_value(&ScriptType::SAL).unwrap(), json!("SAL"));
assert_eq!(to_value(&ScriptType::V).unwrap(), json!("V"));
assert_eq!(to_value(&ScriptType::Python).unwrap(), json!("Python"));
}
#[test]
fn job_status_casing() {
assert_eq!(to_value(&JobStatus::Dispatched).unwrap(), json!("Dispatched"));
assert_eq!(
to_value(&JobStatus::WaitingForPrerequisites).unwrap(),
json!("WaitingForPrerequisites")
);
assert_eq!(to_value(&JobStatus::Started).unwrap(), json!("Started"));
assert_eq!(to_value(&JobStatus::Error).unwrap(), json!("Error"));
assert_eq!(to_value(&JobStatus::Finished).unwrap(), json!("Finished"));
}
}