From f30706a25ada6c2b33a87d4b4e6512bdd04c4c17 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 22 Aug 2025 12:28:36 +0200 Subject: [PATCH] Add service layer to abstract business logic Signed-off-by: Lee Smet --- src/lib.rs | 1 + src/main.rs | 5 ++- src/rpc.rs | 67 +++++++++++++++++++--------------- src/service.rs | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 140 insertions(+), 30 deletions(-) create mode 100644 src/service.rs diff --git a/src/lib.rs b/src/lib.rs index 83161a2..7580f2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ pub mod models; pub mod storage; +pub mod service; mod time; pub mod dag; pub mod rpc; diff --git a/src/main.rs b/src/main.rs index e6ec894..1c569d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,8 +82,11 @@ async fn main() { .await .expect("Failed to connect to Redis"); + // Initialize Service + let service = herocoordinator::service::AppService::new(redis); + // Shared application state - let state = Arc::new(herocoordinator::rpc::AppState::new(redis)); + let state = Arc::new(herocoordinator::rpc::AppState::new(service)); // Build RPC modules for both servers let http_module = herocoordinator::rpc::build_module(state.clone()); diff --git a/src/rpc.rs b/src/rpc.rs index 121c2aa..0e38919 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -13,19 +13,19 @@ use serde::Deserialize; use serde_json::{Value, json}; use crate::{ - dag::{DagError, FlowDag, build_flow_dag}, + dag::{DagError, FlowDag}, models::{Actor, Context, Flow, Job, Message, MessageFormatType, Runner, ScriptType}, - storage::RedisDriver, + service::AppService, time::current_timestamp, }; pub struct AppState { - pub redis: RedisDriver, + pub service: AppService, } impl AppState { - pub fn new(redis: RedisDriver) -> Self { - Self { redis } + pub fn new(service: AppService) -> Self { + Self { service } } } @@ -354,9 +354,9 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?; let actor = p.actor.into_domain().map_err(invalid_params_err)?; - state - .redis - .save_actor(p.context_id, &actor) + let actor = state + .service + .create_actor(p.context_id, actor) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(actor) @@ -372,7 +372,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; let actor = state - .redis + .service .load_actor(p.context_id, p.id) .await .map_err(storage_err)?; @@ -391,7 +391,11 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?; let ctx = p.context.into_domain().map_err(invalid_params_err)?; - state.redis.save_context(&ctx).await.map_err(storage_err)?; + let ctx = state + .service + .create_context(ctx) + .await + .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(ctx) } }) @@ -404,7 +408,11 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?; - let ctx = state.redis.load_context(p.id).await.map_err(storage_err)?; + let ctx = state + .service + .load_context(p.id) + .await + .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(ctx) } }) @@ -420,9 +428,9 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?; let runner = p.runner.into_domain().map_err(invalid_params_err)?; - state - .redis - .save_runner(p.context_id, &runner) + let runner = state + .service + .create_runner(p.context_id, runner) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(runner) @@ -438,7 +446,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?; let runner = state - .redis + .service .load_runner(p.context_id, p.id) .await .map_err(storage_err)?; @@ -457,9 +465,9 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?; let flow = p.flow.into_domain().map_err(invalid_params_err)?; - state - .redis - .save_flow(p.context_id, &flow) + let flow = state + .service + .create_flow(p.context_id, flow) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(flow) @@ -475,7 +483,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; let flow = state - .redis + .service .load_flow(p.context_id, p.id) .await .map_err(storage_err)?; @@ -491,7 +499,9 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; - let dag: FlowDag = build_flow_dag(&state.redis, p.context_id, p.id) + let dag: FlowDag = state + .service + .flow_dag(p.context_id, p.id) .await .map_err(dag_err)?; Ok::<_, ErrorObjectOwned>(dag) @@ -509,12 +519,11 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: JobCreateParams = params.parse().map_err(invalid_params_err)?; let job = p.job.into_domain().map_err(invalid_params_err)?; - state - .redis - .save_job(p.context_id, &job) + let job = state + .service + .create_job(p.context_id, job) .await .map_err(storage_err)?; - Ok::<_, ErrorObjectOwned>(job) } }) @@ -528,7 +537,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: JobLoadParams = params.parse().map_err(invalid_params_err)?; let job = state - .redis + .service .load_job(p.context_id, p.caller_id, p.id) .await .map_err(storage_err)?; @@ -547,9 +556,9 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?; let message = p.message.into_domain().map_err(invalid_params_err)?; - state - .redis - .save_message(p.context_id, &message) + let message = state + .service + .create_message(p.context_id, message) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(message) @@ -565,7 +574,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { async move { let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?; let msg = state - .redis + .service .load_message(p.context_id, p.caller_id, p.id) .await .map_err(storage_err)?; diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..d91520b --- /dev/null +++ b/src/service.rs @@ -0,0 +1,97 @@ +use crate::dag::{build_flow_dag, DagResult, FlowDag}; +use crate::models::{Actor, Context, Flow, Job, Message, Runner}; +use crate::storage::RedisDriver; + +pub type BoxError = Box; + +pub struct AppService { + redis: RedisDriver, +} + +impl AppService { + pub fn new(redis: RedisDriver) -> Self { + Self { redis } + } + + // ----------------------------- + // Context + // ----------------------------- + pub async fn create_context(&self, ctx: Context) -> Result { + self.redis.save_context(&ctx).await?; + Ok(ctx) + } + + pub async fn load_context(&self, id: u32) -> Result { + let ctx = self.redis.load_context(id).await?; + Ok(ctx) + } + + // ----------------------------- + // Actor + // ----------------------------- + pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result { + self.redis.save_actor(context_id, &actor).await?; + Ok(actor) + } + + pub async fn load_actor(&self, context_id: u32, id: u32) -> Result { + let actor = self.redis.load_actor(context_id, id).await?; + Ok(actor) + } + + // ----------------------------- + // Runner + // ----------------------------- + pub async fn create_runner(&self, context_id: u32, runner: Runner) -> Result { + self.redis.save_runner(context_id, &runner).await?; + Ok(runner) + } + + pub async fn load_runner(&self, context_id: u32, id: u32) -> Result { + let runner = self.redis.load_runner(context_id, id).await?; + Ok(runner) + } + + // ----------------------------- + // Flow + // ----------------------------- + pub async fn create_flow(&self, context_id: u32, flow: Flow) -> Result { + self.redis.save_flow(context_id, &flow).await?; + Ok(flow) + } + + pub async fn load_flow(&self, context_id: u32, id: u32) -> Result { + let flow = self.redis.load_flow(context_id, id).await?; + Ok(flow) + } + + pub async fn flow_dag(&self, context_id: u32, flow_id: u32) -> DagResult { + build_flow_dag(&self.redis, context_id, flow_id).await + } + + // ----------------------------- + // Job + // ----------------------------- + pub async fn create_job(&self, context_id: u32, job: Job) -> Result { + self.redis.save_job(context_id, &job).await?; + Ok(job) + } + + pub async fn load_job(&self, context_id: u32, caller_id: u32, id: u32) -> Result { + let job = self.redis.load_job(context_id, caller_id, id).await?; + Ok(job) + } + + // ----------------------------- + // Message + // ----------------------------- + pub async fn create_message(&self, context_id: u32, message: Message) -> Result { + self.redis.save_message(context_id, &message).await?; + Ok(message) + } + + pub async fn load_message(&self, context_id: u32, caller_id: u32, id: u32) -> Result { + let msg = self.redis.load_message(context_id, caller_id, id).await?; + Ok(msg) + } +} \ No newline at end of file