add circle rhai repl and backend start cmd

This commit is contained in:
timurgordon
2025-06-05 00:29:10 +03:00
parent f22d40c980
commit ae3077033b
15 changed files with 5522 additions and 8 deletions

View File

@@ -1,28 +1,32 @@
[package]
name = "circle_server_ws"
name = "circle_ws_lib" # Renamed to reflect library nature
version = "0.1.0"
edition = "2021"
[lib]
name = "circle_ws_lib"
path = "src/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "4"
actix-web-actors = "4"
actix = "0.13"
env_logger = "0.10"
env_logger = "0.10" # Keep for logging within the lib
log = "0.4"
clap = { version = "4.4", features = ["derive"] }
# clap is removed as CLI parsing moves to the orchestrator bin
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
redis = { version = "0.25.0", features = ["tokio-comp"] } # For async Redis with Actix
uuid = { version = "1.6", features = ["v4", "serde"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For polling interval
uuid = { version = "1.6", features = ["v4", "serde"] } # Still used by RhaiClient or for task details
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } # Added "time" for Duration
chrono = { version = "0.4", features = ["serde"] } # For timestamps
rhai_client = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/rhaj/src/client" }
rhai_client = { path = "../../rhailib/src/client" } # Corrected relative path
[dev-dependencies]
tokio-tungstenite = { version = "0.23.0", features = ["native-tls"] }
futures-util = "0.3" # For StreamExt and SinkExt on WebSocket stream
url = "2.5.0" # For parsing WebSocket URL
circle_client_ws = { path = "../client_ws" }
uuid = { version = "1.6", features = ["v4", "serde"] } # For e2e example, if it still uses Uuid directly for req id
# circle_client_ws = { path = "../client_ws" } # This might need adjustment if it's a test client for the old binary
# uuid = { version = "1.6", features = ["v4", "serde"] } # Already in dependencies

302
server_ws/src/lib.rs Normal file
View File

@@ -0,0 +1,302 @@
use actix_web::{web, App, HttpRequest, HttpServer, HttpResponse, Error};
use actix_web_actors::ws;
use actix::{Actor, ActorContext, StreamHandler, AsyncContext, WrapFuture, ActorFutureExt};
// clap::Parser removed
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::time::Duration;
use rhai_client::RhaiClientError;
use rhai_client::RhaiClient;
use tokio::task::JoinHandle;
use tokio::sync::oneshot; // For sending the server handle back
// Newtype wrappers for distinct app_data types
#[derive(Clone)]
struct AppCircleName(String);
#[derive(Clone)]
struct AppRedisUrl(String);
// JSON-RPC 2.0 Structures (remain the same)
#[derive(Serialize, Deserialize, Debug, Clone)]
struct JsonRpcRequest {
jsonrpc: String,
method: String,
params: Value,
id: Option<Value>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct JsonRpcResponse {
jsonrpc: String,
result: Option<Value>,
error: Option<JsonRpcError>,
id: Value,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct JsonRpcError {
code: i32,
message: String,
data: Option<Value>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct PlayParams {
script: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct PlayResult {
output: String,
}
// WebSocket Actor
struct CircleWs {
server_circle_name: String,
redis_url_for_client: String,
}
const TASK_TIMEOUT_DURATION: Duration = Duration::from_secs(30);
const TASK_POLL_INTERVAL_DURATION: Duration = Duration::from_millis(200);
impl CircleWs {
fn new(name: String, redis_url: String) -> Self {
Self {
server_circle_name: name,
redis_url_for_client: redis_url,
}
}
}
impl Actor for CircleWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
log::info!("WebSocket session started for server dedicated to: {}", self.server_circle_name);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> actix::Running {
log::info!("WebSocket session stopping for server dedicated to: {}", self.server_circle_name);
actix::Running::Stop
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CircleWs {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Text(text)) => {
log::debug!("WS Text for {}: {}", self.server_circle_name, text); // Changed to debug for less noise
match serde_json::from_str::<JsonRpcRequest>(&text) {
Ok(req) => {
let client_rpc_id = req.id.clone().unwrap_or(Value::Null);
if req.method == "play" {
match serde_json::from_value::<PlayParams>(req.params.clone()) {
Ok(play_params) => {
let script_content = play_params.script;
// Use the server_circle_name which should be correctly set now
let current_circle_name_for_rhai_client = self.server_circle_name.clone();
let rpc_id_for_client = client_rpc_id.clone();
let redis_url_clone = self.redis_url_for_client.clone();
log::info!("Circle '{}' WS: Received 'play' request, ID: {:?}, for RhaiClient target circle: '{}'", self.server_circle_name, rpc_id_for_client, current_circle_name_for_rhai_client);
let fut = async move {
match RhaiClient::new(&redis_url_clone) {
Ok(rhai_task_client) => {
rhai_task_client.submit_script_and_await_result(
&current_circle_name_for_rhai_client, // This name is used for Redis queue
script_content,
Some(rpc_id_for_client.clone()),
TASK_TIMEOUT_DURATION,
TASK_POLL_INTERVAL_DURATION,
).await
}
Err(e) => {
log::error!("Circle '{}' WS: Failed to create RhaiClient for Redis URL {}: {}", current_circle_name_for_rhai_client, redis_url_clone, e);
Err(e)
}
}
};
ctx.spawn(fut.into_actor(self).map(move |result, _act, ws_ctx| {
let response = match result {
Ok(task_details) => {
if task_details.status == "completed" {
// task_details itself doesn't have a task_id field.
// The task_id is known by the client that initiated the poll.
// We log with client_rpc_id which is the JSON-RPC request ID.
log::info!("Circle '{}' WS: Request ID {:?} completed successfully. Output: {:?}", _act.server_circle_name, client_rpc_id, task_details.output);
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(PlayResult {
output: task_details.output.unwrap_or_default()
}).unwrap()),
error: None,
id: client_rpc_id,
}
} else { // status == "error"
log::warn!("Circle '{}' WS: Request ID {:?} execution failed. Error: {:?}", _act.server_circle_name, client_rpc_id, task_details.error);
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32004,
message: task_details.error.unwrap_or_else(|| "Script execution failed".to_string()),
data: None,
}),
id: client_rpc_id,
}
}
}
Err(rhai_err) => {
log::error!("Circle '{}' WS: RhaiClient operation failed for req ID {:?}: {}", _act.server_circle_name, client_rpc_id, rhai_err);
let (code, message) = match rhai_err {
RhaiClientError::Timeout(task_id) => (-32002, format!("Timeout: {}", task_id)),
RhaiClientError::RedisError(e) => (-32003, format!("Redis error: {}", e)),
RhaiClientError::SerializationError(e) => (-32003, format!("Serialization error: {}", e)),
RhaiClientError::TaskNotFound(task_id) => (-32005, format!("Task not found: {}", task_id)),
};
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
id: client_rpc_id,
error: Some(JsonRpcError { code, message, data: None }),
}
}
};
ws_ctx.text(serde_json::to_string(&response).unwrap());
}));
}
Err(e) => {
log::error!("Circle '{}' WS: Invalid params for 'play' method: {}", self.server_circle_name, e);
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(), result: None, id: client_rpc_id,
error: Some(JsonRpcError { code: -32602, message: "Invalid params".to_string(), data: Some(Value::String(e.to_string())) }),
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
}
}
} else {
log::warn!("Circle '{}' WS: Method not found: {}", self.server_circle_name, req.method);
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(), result: None, id: client_rpc_id,
error: Some(JsonRpcError { code: -32601, message: "Method not found".to_string(), data: None }),
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
}
}
Err(e) => {
log::error!("Circle '{}' WS: Failed to parse JSON-RPC request: {}", self.server_circle_name, e);
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(), result: None, id: Value::Null,
error: Some(JsonRpcError { code: -32700, message: "Parse error".to_string(), data: Some(Value::String(e.to_string())) }),
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
}
}
}
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Pong(_)) => {},
Ok(ws::Message::Binary(_bin)) => log::warn!("Circle '{}' WS: Binary messages not supported.", self.server_circle_name),
Ok(ws::Message::Close(reason)) => {
log::info!("Circle '{}' WS: Close message received. Reason: {:?}", self.server_circle_name, reason);
ctx.close(reason);
ctx.stop();
}
Ok(ws::Message::Continuation(_)) => ctx.stop(),
Ok(ws::Message::Nop) => (),
Err(e) => {
log::error!("Circle '{}' WS: Error: {:?}", self.server_circle_name, e);
ctx.stop();
}
}
}
}
// Modified ws_handler to accept newtype wrapped app_data
async fn ws_handler_modified(
req: HttpRequest,
stream: web::Payload,
app_circle_name: web::Data<AppCircleName>, // Use wrapped type
app_redis_url: web::Data<AppRedisUrl>, // Use wrapped type
) -> Result<HttpResponse, Error> {
let circle_name_str = app_circle_name.0.clone();
let redis_url_str = app_redis_url.0.clone();
log::info!("WebSocket handshake attempt for server: '{}' with redis: '{}'", circle_name_str, redis_url_str);
let resp = ws::start(
CircleWs::new(circle_name_str, redis_url_str), // Pass unwrapped strings
&req,
stream
)?;
Ok(resp)
}
// Public factory function to spawn the server
pub fn spawn_circle_ws_server(
_circle_id: u32,
circle_name: String,
port: u16,
redis_url: String,
// Sender to send the server handle back to the orchestrator
server_handle_tx: oneshot::Sender<actix_web::dev::Server>,
) -> JoinHandle<std::io::Result<()>> {
let circle_name_for_log = circle_name.clone();
// redis_url_for_log is not used, but kept for consistency if needed later
tokio::spawn(async move {
let circle_name_outer = circle_name;
let redis_url_outer = redis_url;
let app_factory = move || {
App::new()
.app_data(web::Data::new(AppCircleName(circle_name_outer.clone())))
.app_data(web::Data::new(AppRedisUrl(redis_url_outer.clone())))
.route("/ws", web::get().to(ws_handler_modified))
.default_service(web::route().to(|| async { HttpResponse::NotFound().body("404 Not Found") }))
};
let server_builder = HttpServer::new(app_factory);
let bound_server = match server_builder.bind(("127.0.0.1", port)) {
Ok(srv) => {
log::info!(
"Successfully bound WebSocket server for Circle: '{}' on port {}. Starting...",
circle_name_for_log, port
);
srv
}
Err(e) => {
log::error!(
"Failed to bind WebSocket server for Circle '{}' on port {}: {}",
circle_name_for_log, port, e
);
// If binding fails, we can't send a server handle.
// The orchestrator will see the JoinHandle error out or the oneshot::Sender drop.
return Err(e);
}
};
let server_runnable: actix_web::dev::Server = bound_server.run();
// Send the server handle back to the orchestrator
if server_handle_tx.send(server_runnable.clone()).is_err() {
log::error!(
"Failed to send server handle back to orchestrator for Circle '{}'. Orchestrator might have shut down.",
circle_name_for_log
);
// Server might still run, but orchestrator can't stop it gracefully via this handle.
// Consider stopping it here if sending the handle is critical.
// For now, let it proceed, but log the error.
}
// Now await the server_runnable (which is the Server handle itself)
if let Err(e) = server_runnable.await {
log::error!("WebSocket server for Circle '{}' on port {} failed during run: {}", circle_name_for_log, port, e);
return Err(e);
}
log::info!("WebSocket server for Circle '{}' on port {} shut down gracefully.", circle_name_for_log, port);
Ok(())
})
}