diff --git a/bin/coordinator/src/rpc.rs b/bin/coordinator/src/rpc.rs index b881bb0..64e71c6 100644 --- a/bin/coordinator/src/rpc.rs +++ b/bin/coordinator/src/rpc.rs @@ -200,41 +200,8 @@ impl FlowCreate { } } -#[derive(Debug, Deserialize)] -pub struct JobCreate { - pub id: u32, - pub caller_id: u32, - pub context_id: u32, - pub script: String, - pub runner: Option, - pub timeout: u32, - pub retries: u8, - pub env_vars: HashMap, - pub prerequisites: Vec, - pub depends: Vec, -} - -impl JobCreate { - pub fn into_domain(self) -> Job { - use chrono::Utc; - - // Convert old format to hero_job::Job - // Note: depends and prerequisites are workflow fields that need separate storage - Job { - id: self.id.to_string(), - caller_id: self.caller_id.to_string(), - context_id: self.context_id.to_string(), - payload: self.script, - runner: self.runner.unwrap_or_else(|| "default-runner".to_string()), - timeout: self.timeout as u64, - env_vars: self.env_vars, - created_at: Utc::now(), - updated_at: Utc::now(), - signatures: Vec::new(), - } - // TODO: Store depends and prerequisites separately in JobSummary/DAG - } -} +// JobCreate removed - coordinator only manages flows, not individual jobs +// Jobs should be created by the supervisor or other services #[derive(Debug, Deserialize)] pub struct MessageCreate { @@ -247,40 +214,31 @@ pub struct MessageCreate { pub timeout: u32, pub timeout_ack: u32, pub timeout_result: u32, - pub job: Vec, + // Jobs removed - use flow nodes instead } impl MessageCreate { pub fn into_domain(self) -> Message { + use crate::time::current_timestamp; + let ts = current_timestamp(); - - let MessageCreate { - id, - caller_id, - context_id, - message, - message_type, - message_format_type, - timeout, - timeout_ack, - timeout_result, - job, - } = self; - + + // Convert to Message + // Note: flow_id is set to 0 for now, should be set by the caller Message { - id, - caller_id, - context_id, - flow_id: 0, // TODO: MessageCreate should include flow_id - message, - message_type, - message_format_type, - timeout, - timeout_ack, - timeout_result, + id: self.id, + caller_id: self.caller_id, + context_id: self.context_id, + flow_id: 0, // TODO: Get from params or context + message: self.message, + message_type: self.message_type, + message_format_type: self.message_format_type, + timeout: self.timeout, + timeout_ack: self.timeout_ack, + timeout_result: self.timeout_result, transport_id: None, transport_status: None, nodes: Vec::new(), // TODO: MessageCreate should include nodes - job: job.into_iter().map(JobCreate::into_domain).collect(), + job: Vec::new(), // Jobs removed - coordinator only manages flows logs: Vec::new(), created_at: ts, updated_at: ts, @@ -330,17 +288,7 @@ pub struct FlowLoadParams { pub id: u32, } -#[derive(Debug, Deserialize)] -pub struct JobCreateParams { - pub context_id: u32, - pub job: JobCreate, -} -#[derive(Debug, Deserialize)] -pub struct JobLoadParams { - pub context_id: u32, - pub caller_id: u32, - pub id: u32, -} +// JobCreateParams and JobLoadParams removed - coordinator only manages flows #[derive(Debug, Deserialize)] pub struct MessageCreateParams { @@ -506,42 +454,8 @@ pub fn build_module(state: Arc) -> RpcModule<()> { .expect("register flow.start"); } - // Job - { - let state = state.clone(); - module - .register_async_method("job.create", move |params, _caller, _ctx| { - let state = state.clone(); - async move { - let p: JobCreateParams = params.parse().map_err(invalid_params_err)?; - let job = p.job.into_domain(); - let job = state - .service - .create_job(p.context_id, job) - .await - .map_err(storage_err)?; - Ok::<_, ErrorObjectOwned>(job) - } - }) - .expect("register job.create"); - } - { - let state = state.clone(); - module - .register_async_method("job.load", move |params, _caller, _ctx| { - let state = state.clone(); - async move { - let p: JobLoadParams = params.parse().map_err(invalid_params_err)?; - let job = state - .service - .load_job(p.context_id, p.caller_id, p.id) - .await - .map_err(storage_err)?; - Ok::<_, ErrorObjectOwned>(job) - } - }) - .expect("register job.load"); - } + // Job endpoints removed - coordinator only manages flows + // Jobs should be created and managed by the supervisor // Message { diff --git a/lib/clients/coordinator/src/lib.rs b/lib/clients/coordinator/src/lib.rs index 5b8d6d1..bfb1a2d 100644 --- a/lib/clients/coordinator/src/lib.rs +++ b/lib/clients/coordinator/src/lib.rs @@ -140,39 +140,8 @@ impl CoordinatorClient { } } - // ==================== Job Methods ==================== - - /// Create a new job in a context - pub async fn job_create(&self, context_id: u32, job: JobCreate) -> Result { - let params = serde_json::json!({ - "context_id": context_id, - "job": job - }); - self.call("job.create", params).await - } - - /// Load an existing job from a context - pub async fn job_load(&self, context_id: u32, caller_id: u32, id: u32) -> Result { - let params = serde_json::json!({ - "context_id": context_id, - "caller_id": caller_id, - "id": id - }); - self.call("job.load", params).await - } - - /// Try to create a job, or load it if it already exists - pub async fn job_create_or_load(&self, context_id: u32, job: JobCreate) -> Result { - let caller_id = job.caller_id; - let job_id = job.id; - match self.job_create(context_id, job).await { - Ok(j) => Ok(j), - Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => { - self.job_load(context_id, caller_id, job_id).await - } - Err(e) => Err(e), - } - } + // Job methods removed - coordinator only manages flows + // Jobs should be created and managed by the supervisor // ==================== Flow Methods ==================== diff --git a/lib/clients/coordinator/src/models.rs b/lib/clients/coordinator/src/models.rs index 2b02c2d..5e346e3 100644 --- a/lib/clients/coordinator/src/models.rs +++ b/lib/clients/coordinator/src/models.rs @@ -103,24 +103,8 @@ pub enum ScriptType { // ==================== Job ==================== -/// Parameters for creating a job -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct JobCreate { - pub id: u32, - pub caller_id: u32, - pub context_id: u32, - pub script: String, - pub script_type: ScriptType, - pub timeout: u64, - #[serde(default)] - pub retries: u8, - #[serde(default)] - pub env_vars: HashMap, - #[serde(default)] - pub prerequisites: Vec, - #[serde(default)] - pub depends: Vec, -} +// JobCreate removed - coordinator only manages flows, not individual jobs +// Use hero_job::Job from lib/models/job for job operations // ==================== Flow ==================== diff --git a/tests/coordinator.rs b/tests/coordinator.rs index 5e69203..58df908 100644 --- a/tests/coordinator.rs +++ b/tests/coordinator.rs @@ -118,28 +118,10 @@ async fn test_01_flow_create_simple() { let client = create_client().await; - // Create jobs for the flow + // Note: Jobs should be created by the supervisor, not the coordinator + // For this test, we'll create a flow with job IDs that may not exist yet + // In a real scenario, jobs would be created by the supervisor first let job_ids = vec![BASE_JOB_ID, BASE_JOB_ID + 1]; - for (i, job_id) in job_ids.iter().enumerate() { - let job = JobCreate { - id: *job_id, - caller_id: TEST_CALLER_ID, - context_id: TEST_CONTEXT_ID, - script: format!("print('job {}')", i), - script_type: ScriptType::Python, - timeout: 60, - retries: 0, - env_vars: HashMap::new(), - prerequisites: vec![], - depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, - }; - - let result = client.job_create_or_load(TEST_CONTEXT_ID, job).await; - if let Err(ref e) = result { - println!(" Job {} creation error: {:?}", job_id, e); - } - assert!(result.is_ok(), "Job {} should be created", job_id); - } // Create flow let flow_create = FlowCreate { @@ -171,21 +153,6 @@ async fn test_02_flow_load() { // Create a flow first (reuse from test_01) let job_ids = vec![BASE_JOB_ID, BASE_JOB_ID + 1]; - for (i, job_id) in job_ids.iter().enumerate() { - let job = JobCreate { - id: *job_id, - caller_id: TEST_CALLER_ID, - context_id: TEST_CONTEXT_ID, - script: format!("print('job {}')", i), - script_type: ScriptType::Python, - timeout: 60, - retries: 0, - env_vars: HashMap::new(), - prerequisites: vec![], - depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, - }; - let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; - } let flow_create = FlowCreate { id: TEST_FLOW_ID, @@ -216,23 +183,8 @@ async fn test_03_flow_dag() { let client = create_client().await; - // Create jobs with dependencies + // Note: Jobs should be created by the supervisor let job_ids = vec![BASE_JOB_ID + 100, BASE_JOB_ID + 101, BASE_JOB_ID + 102]; - for (i, job_id) in job_ids.iter().enumerate() { - let job = JobCreate { - id: *job_id, - caller_id: TEST_CALLER_ID, - context_id: TEST_CONTEXT_ID, - script: format!("print('dag job {}')", i), - script_type: ScriptType::Python, - timeout: 60, - retries: 0, - env_vars: HashMap::new(), - prerequisites: vec![], - depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, - }; - let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; - } let flow_id = TEST_FLOW_ID + 1; let flow_create = FlowCreate { @@ -268,19 +220,6 @@ async fn test_04_flow_start() { // Create a simple flow let job_id = BASE_JOB_ID + 200; - let job = JobCreate { - id: job_id, - caller_id: TEST_CALLER_ID, - context_id: TEST_CONTEXT_ID, - script: "print('start test')".to_string(), - script_type: ScriptType::Python, - timeout: 60, - retries: 0, - env_vars: HashMap::new(), - prerequisites: vec![], - depends: vec![], - }; - let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; let flow_id = TEST_FLOW_ID + 2; let flow_create = FlowCreate {