Add service layer to abstract business logic
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
pub mod models;
|
pub mod models;
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
|
pub mod service;
|
||||||
mod time;
|
mod time;
|
||||||
pub mod dag;
|
pub mod dag;
|
||||||
pub mod rpc;
|
pub mod rpc;
|
||||||
|
@@ -82,8 +82,11 @@ async fn main() {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to Redis");
|
.expect("Failed to connect to Redis");
|
||||||
|
|
||||||
|
// Initialize Service
|
||||||
|
let service = herocoordinator::service::AppService::new(redis);
|
||||||
|
|
||||||
// Shared application state
|
// Shared application state
|
||||||
let state = Arc::new(herocoordinator::rpc::AppState::new(redis));
|
let state = Arc::new(herocoordinator::rpc::AppState::new(service));
|
||||||
|
|
||||||
// Build RPC modules for both servers
|
// Build RPC modules for both servers
|
||||||
let http_module = herocoordinator::rpc::build_module(state.clone());
|
let http_module = herocoordinator::rpc::build_module(state.clone());
|
||||||
|
67
src/rpc.rs
67
src/rpc.rs
@@ -13,19 +13,19 @@ use serde::Deserialize;
|
|||||||
use serde_json::{Value, json};
|
use serde_json::{Value, json};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
dag::{DagError, FlowDag, build_flow_dag},
|
dag::{DagError, FlowDag},
|
||||||
models::{Actor, Context, Flow, Job, Message, MessageFormatType, Runner, ScriptType},
|
models::{Actor, Context, Flow, Job, Message, MessageFormatType, Runner, ScriptType},
|
||||||
storage::RedisDriver,
|
service::AppService,
|
||||||
time::current_timestamp,
|
time::current_timestamp,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct AppState {
|
pub struct AppState {
|
||||||
pub redis: RedisDriver,
|
pub service: AppService,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
pub fn new(redis: RedisDriver) -> Self {
|
pub fn new(service: AppService) -> Self {
|
||||||
Self { redis }
|
Self { service }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -354,9 +354,9 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?;
|
let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let actor = p.actor.into_domain().map_err(invalid_params_err)?;
|
let actor = p.actor.into_domain().map_err(invalid_params_err)?;
|
||||||
state
|
let actor = state
|
||||||
.redis
|
.service
|
||||||
.save_actor(p.context_id, &actor)
|
.create_actor(p.context_id, actor)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(actor)
|
Ok::<_, ErrorObjectOwned>(actor)
|
||||||
@@ -372,7 +372,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let actor = state
|
let actor = state
|
||||||
.redis
|
.service
|
||||||
.load_actor(p.context_id, p.id)
|
.load_actor(p.context_id, p.id)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
@@ -391,7 +391,11 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?;
|
let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let ctx = p.context.into_domain().map_err(invalid_params_err)?;
|
let ctx = p.context.into_domain().map_err(invalid_params_err)?;
|
||||||
state.redis.save_context(&ctx).await.map_err(storage_err)?;
|
let ctx = state
|
||||||
|
.service
|
||||||
|
.create_context(ctx)
|
||||||
|
.await
|
||||||
|
.map_err(storage_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(ctx)
|
Ok::<_, ErrorObjectOwned>(ctx)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -404,7 +408,11 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
async move {
|
async move {
|
||||||
let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let ctx = state.redis.load_context(p.id).await.map_err(storage_err)?;
|
let ctx = state
|
||||||
|
.service
|
||||||
|
.load_context(p.id)
|
||||||
|
.await
|
||||||
|
.map_err(storage_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(ctx)
|
Ok::<_, ErrorObjectOwned>(ctx)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -420,9 +428,9 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?;
|
let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let runner = p.runner.into_domain().map_err(invalid_params_err)?;
|
let runner = p.runner.into_domain().map_err(invalid_params_err)?;
|
||||||
state
|
let runner = state
|
||||||
.redis
|
.service
|
||||||
.save_runner(p.context_id, &runner)
|
.create_runner(p.context_id, runner)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(runner)
|
Ok::<_, ErrorObjectOwned>(runner)
|
||||||
@@ -438,7 +446,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let runner = state
|
let runner = state
|
||||||
.redis
|
.service
|
||||||
.load_runner(p.context_id, p.id)
|
.load_runner(p.context_id, p.id)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
@@ -457,9 +465,9 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?;
|
let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let flow = p.flow.into_domain().map_err(invalid_params_err)?;
|
let flow = p.flow.into_domain().map_err(invalid_params_err)?;
|
||||||
state
|
let flow = state
|
||||||
.redis
|
.service
|
||||||
.save_flow(p.context_id, &flow)
|
.create_flow(p.context_id, flow)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(flow)
|
Ok::<_, ErrorObjectOwned>(flow)
|
||||||
@@ -475,7 +483,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let flow = state
|
let flow = state
|
||||||
.redis
|
.service
|
||||||
.load_flow(p.context_id, p.id)
|
.load_flow(p.context_id, p.id)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
@@ -491,7 +499,9 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
async move {
|
async move {
|
||||||
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let dag: FlowDag = build_flow_dag(&state.redis, p.context_id, p.id)
|
let dag: FlowDag = state
|
||||||
|
.service
|
||||||
|
.flow_dag(p.context_id, p.id)
|
||||||
.await
|
.await
|
||||||
.map_err(dag_err)?;
|
.map_err(dag_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(dag)
|
Ok::<_, ErrorObjectOwned>(dag)
|
||||||
@@ -509,12 +519,11 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: JobCreateParams = params.parse().map_err(invalid_params_err)?;
|
let p: JobCreateParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let job = p.job.into_domain().map_err(invalid_params_err)?;
|
let job = p.job.into_domain().map_err(invalid_params_err)?;
|
||||||
state
|
let job = state
|
||||||
.redis
|
.service
|
||||||
.save_job(p.context_id, &job)
|
.create_job(p.context_id, job)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
|
|
||||||
Ok::<_, ErrorObjectOwned>(job)
|
Ok::<_, ErrorObjectOwned>(job)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -528,7 +537,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: JobLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: JobLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let job = state
|
let job = state
|
||||||
.redis
|
.service
|
||||||
.load_job(p.context_id, p.caller_id, p.id)
|
.load_job(p.context_id, p.caller_id, p.id)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
@@ -547,9 +556,9 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?;
|
let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let message = p.message.into_domain().map_err(invalid_params_err)?;
|
let message = p.message.into_domain().map_err(invalid_params_err)?;
|
||||||
state
|
let message = state
|
||||||
.redis
|
.service
|
||||||
.save_message(p.context_id, &message)
|
.create_message(p.context_id, message)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
Ok::<_, ErrorObjectOwned>(message)
|
Ok::<_, ErrorObjectOwned>(message)
|
||||||
@@ -565,7 +574,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
async move {
|
async move {
|
||||||
let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?;
|
let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
let msg = state
|
let msg = state
|
||||||
.redis
|
.service
|
||||||
.load_message(p.context_id, p.caller_id, p.id)
|
.load_message(p.context_id, p.caller_id, p.id)
|
||||||
.await
|
.await
|
||||||
.map_err(storage_err)?;
|
.map_err(storage_err)?;
|
||||||
|
97
src/service.rs
Normal file
97
src/service.rs
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
use crate::dag::{build_flow_dag, DagResult, FlowDag};
|
||||||
|
use crate::models::{Actor, Context, Flow, Job, Message, Runner};
|
||||||
|
use crate::storage::RedisDriver;
|
||||||
|
|
||||||
|
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
|
||||||
|
|
||||||
|
pub struct AppService {
|
||||||
|
redis: RedisDriver,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppService {
|
||||||
|
pub fn new(redis: RedisDriver) -> Self {
|
||||||
|
Self { redis }
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------
|
||||||
|
// Context
|
||||||
|
// -----------------------------
|
||||||
|
pub async fn create_context(&self, ctx: Context) -> Result<Context, BoxError> {
|
||||||
|
self.redis.save_context(&ctx).await?;
|
||||||
|
Ok(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_context(&self, id: u32) -> Result<Context, BoxError> {
|
||||||
|
let ctx = self.redis.load_context(id).await?;
|
||||||
|
Ok(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------
|
||||||
|
// Actor
|
||||||
|
// -----------------------------
|
||||||
|
pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result<Actor, BoxError> {
|
||||||
|
self.redis.save_actor(context_id, &actor).await?;
|
||||||
|
Ok(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_actor(&self, context_id: u32, id: u32) -> Result<Actor, BoxError> {
|
||||||
|
let actor = self.redis.load_actor(context_id, id).await?;
|
||||||
|
Ok(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------
|
||||||
|
// Runner
|
||||||
|
// -----------------------------
|
||||||
|
pub async fn create_runner(&self, context_id: u32, runner: Runner) -> Result<Runner, BoxError> {
|
||||||
|
self.redis.save_runner(context_id, &runner).await?;
|
||||||
|
Ok(runner)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_runner(&self, context_id: u32, id: u32) -> Result<Runner, BoxError> {
|
||||||
|
let runner = self.redis.load_runner(context_id, id).await?;
|
||||||
|
Ok(runner)
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------
|
||||||
|
// Flow
|
||||||
|
// -----------------------------
|
||||||
|
pub async fn create_flow(&self, context_id: u32, flow: Flow) -> Result<Flow, BoxError> {
|
||||||
|
self.redis.save_flow(context_id, &flow).await?;
|
||||||
|
Ok(flow)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_flow(&self, context_id: u32, id: u32) -> Result<Flow, BoxError> {
|
||||||
|
let flow = self.redis.load_flow(context_id, id).await?;
|
||||||
|
Ok(flow)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn flow_dag(&self, context_id: u32, flow_id: u32) -> DagResult<FlowDag> {
|
||||||
|
build_flow_dag(&self.redis, context_id, flow_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------
|
||||||
|
// Job
|
||||||
|
// -----------------------------
|
||||||
|
pub async fn create_job(&self, context_id: u32, job: Job) -> Result<Job, BoxError> {
|
||||||
|
self.redis.save_job(context_id, &job).await?;
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_job(&self, context_id: u32, caller_id: u32, id: u32) -> Result<Job, BoxError> {
|
||||||
|
let job = self.redis.load_job(context_id, caller_id, id).await?;
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------
|
||||||
|
// Message
|
||||||
|
// -----------------------------
|
||||||
|
pub async fn create_message(&self, context_id: u32, message: Message) -> Result<Message, BoxError> {
|
||||||
|
self.redis.save_message(context_id, &message).await?;
|
||||||
|
Ok(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_message(&self, context_id: u32, caller_id: u32, id: u32) -> Result<Message, BoxError> {
|
||||||
|
let msg = self.redis.load_message(context_id, caller_id, id).await?;
|
||||||
|
Ok(msg)
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user