feat(11-C4): play_run_async + play_wait + span_push + flow_library_search + play_start routing #20

Merged
timur merged 1 commit from feat/11-phase-c4-rpcs-sse-routing into development 2026-05-05 12:58:25 +00:00
Owner

Summary

Phase C4 of #11. Wires the executor (C3) into the LogicService trait so clients can drive Python flows over RPC, and routes the existing play_start to the new path when a WorkflowVersion sets python_source. SSE endpoint deliberately deferred — the broadcast plumbing is its own concern and Story 2 (#12) drives the UI that consumes it.

What lands

RPC Behavior
play_run_async(workflow_sid, input_data, parent_span_id)str Non-blocking. Validates → creates Play → spawns executor in tokio task → returns play_sid. parent_span_id (empty for top-level) becomes the parent of the child flow's root span via HERO_FLOW_PARENT_SPAN. Rejects DAG-only versions.
play_wait(play_sid, timeout_ms)Play Server-side block until terminal status or timeout. timeout_ms == 0 ⇒ wait forever. 50ms polling.
span_push(play_sid, event_json)bool Internal — applies one JSONL span event through the same persistence path the listener uses. For tests; production code should not call.
flow_library_search(query, limit)[str] Substring match on name + description, case-insensitive, name matches weighted 3×, newest-first within the same score bucket. v1 — no fuzzy / no vector similarity.

play_start routing

When the resolved WorkflowVersion.python_source is non-empty, play_start skips DAG validation entirely (nodes/edges aren't expected to populate for Python flows) and routes to spawn_python_flow. The DAG path stays as-is for old flows; Phase D removes it.

span_socket refactor

Extracted apply_event() body into a free apply_event_to_play so both SpanListener and the new public apply_event_json (which span_push calls) share one implementation. No behavior change.

spawn_python_flow helper

Outcome → ExecutionStatus mapping:

Outcome Status Notes
Ok(outcome) + timed_out TimedOut error_message: "Wall-clock timeout exceeded"
Ok(outcome) + exit_code == Some(0) Success
Non-zero exit Failed stderr captured into error_message
Spawn itself errored Failed

Always sets completed_at and duration_ms so the UI can render a finished play correctly.

Tests (12 in c4_tests module)

  • flow_library_search: name-vs-description ranking, case insensitivity, limit, empty query, exclusion of non-matches
  • play_wait: terminal-immediate-return (~ms), timeout-bound (~200ms)
  • span_push: full lifecycle round-trip, malformed JSON rejection
  • 3× routing: play_run_async Python path creates Running Play, play_run_async DAG workflow rejected with InvalidInput, play_start routes by python_source non-empty

What's NOT here

  • SSE endpoint GET /api/plays/:sid/spans (separate PR — needs broadcast channel through SpanListener and a UI route in hero_logic_ui)
  • Migration tool / DAG deletion (Phase D)

Phase plan (#11)

  • A — schema additive (#15, merged)
  • B — hero_tracing.py SDK (#16, merged)
  • C1 — staging (#17, merged)
  • C2 — span socket listener (#18, merged)
  • C3 — subprocess + Tier 0 sandbox (#19, merged)
  • C4 — this PR — RPCs + play_start routing
  • C5 (future) — SSE
  • D — migration + delete DAG

Test plan

  • cargo test -p hero_logic --lib c4_tests — 12/12 pass
  • cargo test --workspace --lib — 44 total, all green
  • cargo build --workspace clean

🤖 Generated with Claude Code

## Summary Phase C4 of #11. Wires the executor (C3) into the LogicService trait so clients can drive Python flows over RPC, and routes the existing `play_start` to the new path when a `WorkflowVersion` sets `python_source`. SSE endpoint deliberately deferred — the broadcast plumbing is its own concern and Story 2 (#12) drives the UI that consumes it. ## What lands | RPC | Behavior | |---|---| | `play_run_async(workflow_sid, input_data, parent_span_id)` → `str` | Non-blocking. Validates → creates Play → spawns executor in tokio task → returns play_sid. `parent_span_id` (empty for top-level) becomes the parent of the child flow's root span via `HERO_FLOW_PARENT_SPAN`. Rejects DAG-only versions. | | `play_wait(play_sid, timeout_ms)` → `Play` | Server-side block until terminal status or timeout. `timeout_ms == 0` ⇒ wait forever. 50ms polling. | | `span_push(play_sid, event_json)` → `bool` | Internal — applies one JSONL span event through the same persistence path the listener uses. For tests; production code should not call. | | `flow_library_search(query, limit)` → `[str]` | Substring match on `name` + `description`, case-insensitive, name matches weighted 3×, newest-first within the same score bucket. v1 — no fuzzy / no vector similarity. | ## play_start routing When the resolved `WorkflowVersion.python_source` is non-empty, `play_start` skips DAG validation entirely (nodes/edges aren't expected to populate for Python flows) and routes to `spawn_python_flow`. The DAG path stays as-is for old flows; Phase D removes it. ## span_socket refactor Extracted `apply_event()` body into a free `apply_event_to_play` so both `SpanListener` and the new public `apply_event_json` (which `span_push` calls) share one implementation. No behavior change. ## spawn_python_flow helper Outcome → `ExecutionStatus` mapping: | Outcome | Status | Notes | |---|---|---| | `Ok(outcome)` + `timed_out` | `TimedOut` | error_message: "Wall-clock timeout exceeded" | | `Ok(outcome)` + `exit_code == Some(0)` | `Success` | | | Non-zero exit | `Failed` | stderr captured into error_message | | Spawn itself errored | `Failed` | | Always sets `completed_at` and `duration_ms` so the UI can render a finished play correctly. ## Tests (12 in `c4_tests` module) - 5× `flow_library_search`: name-vs-description ranking, case insensitivity, limit, empty query, exclusion of non-matches - 2× `play_wait`: terminal-immediate-return (~ms), timeout-bound (~200ms) - 2× `span_push`: full lifecycle round-trip, malformed JSON rejection - 3× routing: `play_run_async` Python path creates Running Play, `play_run_async` DAG workflow rejected with InvalidInput, `play_start` routes by `python_source` non-empty ## What's NOT here - SSE endpoint `GET /api/plays/:sid/spans` (separate PR — needs broadcast channel through `SpanListener` and a UI route in `hero_logic_ui`) - Migration tool / DAG deletion (Phase D) ## Phase plan (#11) - A — schema additive (#15, merged) - B — `hero_tracing.py` SDK (#16, merged) - C1 — staging (#17, merged) - C2 — span socket listener (#18, merged) - C3 — subprocess + Tier 0 sandbox (#19, merged) - **C4 — this PR** — RPCs + play_start routing - C5 (future) — SSE - D — migration + delete DAG ## Test plan - [x] `cargo test -p hero_logic --lib c4_tests` — 12/12 pass - [x] `cargo test --workspace --lib` — 44 total, all green - [x] `cargo build --workspace` clean 🤖 Generated with [Claude Code](https://claude.com/claude-code)
Phase C4 of #11. Wires the executor (C3) into the LogicService trait so
clients can actually drive Python flows over RPC, and routes the
existing play_start to the new path when a WorkflowVersion sets
python_source. SSE endpoint deliberately deferred to a follow-up — the
broadcast plumbing is its own concern and Story 2 (#12) drives the UI
that consumes it.

Schema (logic.oschema → regenerated artifacts):

- play_run_async(workflow_sid, input_data, parent_span_id) -> str
  Non-blocking. Validates inputs, creates the Play row, kicks off the
  PythonFlowExecutor in a background tokio task, returns play_sid
  immediately. parent_span_id (empty for top-level launches) becomes
  the parent of the child flow's root span via HERO_FLOW_PARENT_SPAN.
  Rejects DAG-only workflow versions — DAG flows use play_start.

- play_wait(play_sid, timeout_ms) -> Play
  Server-side block until the play reaches a terminal status
  (Success | Failed | Cancelled | TimedOut) or the timeout fires.
  timeout_ms == 0 means "wait forever". Polling-based at 50ms — fine
  for the expected call rate (one watcher per play).

- span_push(play_sid, event_json) -> bool
  Internal — applies a single JSONL span event to a Play through the
  same persistence path the executor's socket listener uses. Exists so
  tests can drive the persistence layer without spawning a Python
  subprocess. Production code should not call this; the executor owns
  the socket.

- flow_library_search(query, limit) -> [str]
  Keyword search across saved Workflow records. Substring match in
  name + description, case-insensitive, ranked by total match count
  (name matches weighted 3× over description). Newest-first within the
  same score bucket. v1 — no fuzzy / no vector similarity; semantic
  follow-up tracked separately. Used by Service Agent v3 (#13).

play_start routing:

When the resolved WorkflowVersion has python_source non-empty, skip
the DAG validation step entirely (nodes/edges are not expected to
populate for Python flows) and route to spawn_python_flow. The DAG
path stays as-is for old flows; Phase D removes it.

span_socket refactor:

apply_event() body is now a free `apply_event_to_play` function so
both SpanListener and the new public `apply_event_json` helper share
one implementation. apply_event_json is the entry point span_push
calls.

spawn_python_flow helper:

Sets Play.completed_at + duration_ms unconditionally. Maps outcome →
ExecutionStatus:
- Ok(outcome) with timed_out ⇒ TimedOut + "Wall-clock timeout exceeded"
- Ok(outcome) with exit_code == Some(0) ⇒ Success
- non-zero exit ⇒ Failed (stderr captured into error_message)
- spawn error itself ⇒ Failed

Tests (12 in c4_tests module):

- 5× flow_library_search: name-vs-description ranking, case
  insensitivity, limit, empty query, exclusion of non-matches
- 2× play_wait: terminal-immediate-return, timeout-bound
- 2× span_push: full lifecycle round-trip, malformed JSON rejection
- 3× play_run_async / play_start routing: Python path creates Running
  Play with correct workflow_sid, DAG workflow rejected with
  InvalidInput, play_start routes by python_source non-empty

What's NOT here:

- SSE endpoint /api/plays/:sid/spans (separate PR — needs broadcast
  channel plumbing through SpanListener and a UI route in
  hero_logic_ui; Story 2 / #12 drives consumption)
- Migration tool / DAG deletion (Phase D)

Refs hero_logic#11 (Story 1: Foundation), hero_logic#10 (epic).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
timur merged commit 0496f57cfb into development 2026-05-05 12:58:25 +00:00
Sign in to join this conversation.
No reviewers
No labels
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_logic!20
No description provided.