Story 1: Foundation — Python-flow executor + schema + span socket #11

Open
opened 2026-04-21 14:18:06 +00:00 by timur · 2 comments
Owner

Goal

Replace the DAG executor with a Python-flow executor that listens on a span socket and persists the span tree to the Play record. Also ship hero_tracing.py — the generic Python module that flow authors import to get @flow, flow.step, instrument(), etc.

Parent epic: #10
Prerequisite: hero_rpc#29 (service methods in generated clients). hero_rpc#30 was closed — its scope moved here.

Scope

Schema (schemas/logic/logic.oschema)

  • WorkflowVersion.python_source: str replaces nodes: [Node] + edges: [Edge]
  • Play.spans: [Span] replaces node_runs: [NodeRun]
  • SpanStatus enum replaces ExecutionStatus (values: pending | running | ok | failed | timed_out | cancelled)
  • New Span type:
Span = {
    span_id: str
    parent_id: str          # empty for root
    name: str
    start_ms: u64
    end_ms: u64
    status: SpanStatus
    tags: str               # JSON-serialized key-values
    logs: str               # captured stdout/stderr within the span
    error: str              # exception + traceback
    rpc_service: str        # for rpc:* spans from instrumented clients
    rpc_method: str
    child_play_sid: str     # for sub-flow spans
}
  • Delete Node, Edge, NodeConfig, NodeRun, NodeType, ExecutionStatus

New service methods on LogicService

  • play_run_async(workflow_sid, input_data, parent_span_id) -> str — non-blocking; returns play_sid
  • play_wait(play_sid, timeout_ms) -> Play — block until terminal
  • span_push(play_sid, event_json) -> bool — internal, used by executor's socket listener but exposed for tests
  • flow_library_search(query, limit) -> [str] — keyword first; Story 3 lands the semantic-match version
  • (Existing play_start, play_status, play_retry, play_cancel, play_resume stay)

hero_tracing.py — new Python module

Lives at sdk/python/hero_tracing.py in the hero_logic repo. Embedded in the hero_logic_server binary via include_str! or rust-embed. Staged on startup at ~/.hero/var/flows/sdk/hero_tracing.py. The executor adds that path to PYTHONPATH when spawning flow subprocesses.

Contains:

@flow(name: str, inputs: dict = None, description: str = '')    # decorator
flow.step(name: str, **tags) -> context manager
flow.span(name: str, **tags) -> context manager                  # generic, if user wants
flow.Failed(reason: str)                                          # exception
flow.current_span                                                 # for mid-step tags/logs
instrument(client) -> proxy                                       # __getattr__-based wrapper

Plus plumbing:

  • Reads HERO_FLOW_SPAN_SOCK env var; if set, connects and writes JSONL span events
  • If unset (standalone-script execution of a @flow-decorated function), helpers are silent no-ops
  • contextvars-based parent-span propagation across asyncio.create_task / gather

Generic, service-agnostic, ~400-500 lines of stdlib-only Python.

Executor rewrite (src/engine/executor.rs)

  • On server startup: write hero_tracing.py to ~/.hero/var/flows/sdk/hero_tracing.py (mkdir, overwrite on each start so updates land)
  • On play_start:
    1. Validate inputs → create Play → create /tmp/spans-{play_sid}.sock UDS
    2. Spawn python3 via hero_proc with:
      • HERO_FLOW_SPAN_SOCK=/tmp/spans-{play_sid}.sock
      • PYTHONPATH prepended with ~/.hero/var/flows/sdk (so from hero_tracing import flow works) and ~/.hero/var/router/python (so service clients work)
      • Script = WorkflowVersion.python_source with a boot-stub appended: load inputs from an env var, call the decorated function
    3. Listen on the socket for JSONL events (span_start, span_end, span_log) — persist each to Play.spans incrementally; publish over SSE
    4. Handle child-play span propagation: when a child Play starts with a parent_span_id, its root span uses that as parent
    5. On subprocess exit: close the socket, compute final Play.status (ok if root span ok, else failed), persist total duration

Delete

  • src/engine/node_executors.rs
  • src/engine/template_loader.rs
  • src/engine/schema.rs (DAG-specific validation)
  • {{outputs.X}} / {{var}} interpolation paths
  • Anything referencing NodeType::Action | Conditional | Transform | Wait | HumanInput

Done when

  • cargo build clean after all deletions
  • ~/.hero/var/flows/sdk/hero_tracing.py is written on server startup
  • A Python file with @flow + 3 with flow.step(...) blocks + 1 instrument()ed client + 6 RPC calls produces a 9-span tree persisted correctly in Play.spans
  • Running two play_run_async calls in parallel from a parent flow (with instrument()ed HeroLogicClient) produces a nested tree (parent → 2 children → their spans)
  • Same parent flow WITHOUT instrument() produces 2 disconnected child-play trees (confirming opt-in propagation)
  • SSE endpoint GET /api/plays/:sid/spans streams span events live as the flow runs
  • All existing RPC methods still respond (play_status, play_cancel, etc.) — just against the new data model
  • One-time migration tool to convert a DAG Workflow to a Python Workflow (best-effort; manual cleanup expected)

Estimate: ~2 weeks (unchanged — tracing module is ~500 lines of straightforward Python)

## Goal Replace the DAG executor with a Python-flow executor that listens on a span socket and persists the span tree to the Play record. **Also ship `hero_tracing.py`** — the generic Python module that flow authors import to get `@flow`, `flow.step`, `instrument()`, etc. **Parent epic:** #10 **Prerequisite:** hero_rpc#29 (service methods in generated clients). hero_rpc#30 was closed — its scope moved here. ## Scope ### Schema (`schemas/logic/logic.oschema`) - `WorkflowVersion.python_source: str` replaces `nodes: [Node]` + `edges: [Edge]` - `Play.spans: [Span]` replaces `node_runs: [NodeRun]` - `SpanStatus` enum replaces `ExecutionStatus` (values: `pending | running | ok | failed | timed_out | cancelled`) - New `Span` type: ``` Span = { span_id: str parent_id: str # empty for root name: str start_ms: u64 end_ms: u64 status: SpanStatus tags: str # JSON-serialized key-values logs: str # captured stdout/stderr within the span error: str # exception + traceback rpc_service: str # for rpc:* spans from instrumented clients rpc_method: str child_play_sid: str # for sub-flow spans } ``` - Delete `Node`, `Edge`, `NodeConfig`, `NodeRun`, `NodeType`, `ExecutionStatus` ### New service methods on LogicService - `play_run_async(workflow_sid, input_data, parent_span_id) -> str` — non-blocking; returns play_sid - `play_wait(play_sid, timeout_ms) -> Play` — block until terminal - `span_push(play_sid, event_json) -> bool` — internal, used by executor's socket listener but exposed for tests - `flow_library_search(query, limit) -> [str]` — keyword first; Story 3 lands the semantic-match version - (Existing `play_start`, `play_status`, `play_retry`, `play_cancel`, `play_resume` stay) ### `hero_tracing.py` — new Python module Lives at `sdk/python/hero_tracing.py` in the hero_logic repo. Embedded in the hero_logic_server binary via `include_str!` or rust-embed. Staged on startup at `~/.hero/var/flows/sdk/hero_tracing.py`. The executor adds that path to `PYTHONPATH` when spawning flow subprocesses. Contains: ```python @flow(name: str, inputs: dict = None, description: str = '') # decorator flow.step(name: str, **tags) -> context manager flow.span(name: str, **tags) -> context manager # generic, if user wants flow.Failed(reason: str) # exception flow.current_span # for mid-step tags/logs instrument(client) -> proxy # __getattr__-based wrapper ``` Plus plumbing: - Reads `HERO_FLOW_SPAN_SOCK` env var; if set, connects and writes JSONL span events - If unset (standalone-script execution of a `@flow`-decorated function), helpers are silent no-ops - contextvars-based parent-span propagation across `asyncio.create_task` / `gather` Generic, service-agnostic, ~400-500 lines of stdlib-only Python. ### Executor rewrite (`src/engine/executor.rs`) - On server startup: write `hero_tracing.py` to `~/.hero/var/flows/sdk/hero_tracing.py` (mkdir, overwrite on each start so updates land) - On `play_start`: 1. Validate inputs → create Play → create `/tmp/spans-{play_sid}.sock` UDS 2. Spawn python3 via hero_proc with: - `HERO_FLOW_SPAN_SOCK=/tmp/spans-{play_sid}.sock` - `PYTHONPATH` prepended with `~/.hero/var/flows/sdk` (so `from hero_tracing import flow` works) and `~/.hero/var/router/python` (so service clients work) - Script = `WorkflowVersion.python_source` with a boot-stub appended: load inputs from an env var, call the decorated function 3. Listen on the socket for JSONL events (`span_start`, `span_end`, `span_log`) — persist each to `Play.spans` incrementally; publish over SSE 4. Handle child-play span propagation: when a child Play starts with a `parent_span_id`, its root span uses that as parent 5. On subprocess exit: close the socket, compute final Play.status (ok if root span ok, else failed), persist total duration ### Delete - `src/engine/node_executors.rs` - `src/engine/template_loader.rs` - `src/engine/schema.rs` (DAG-specific validation) - `{{outputs.X}}` / `{{var}}` interpolation paths - Anything referencing `NodeType::Action | Conditional | Transform | Wait | HumanInput` ## Done when - `cargo build` clean after all deletions - `~/.hero/var/flows/sdk/hero_tracing.py` is written on server startup - A Python file with `@flow` + 3 `with flow.step(...)` blocks + 1 `instrument()`ed client + 6 RPC calls produces a 9-span tree persisted correctly in `Play.spans` - Running two `play_run_async` calls in parallel from a parent flow (with `instrument()`ed HeroLogicClient) produces a nested tree (parent → 2 children → their spans) - Same parent flow WITHOUT `instrument()` produces 2 disconnected child-play trees (confirming opt-in propagation) - SSE endpoint `GET /api/plays/:sid/spans` streams span events live as the flow runs - All existing RPC methods still respond (`play_status`, `play_cancel`, etc.) — just against the new data model - One-time migration tool to convert a DAG Workflow to a Python Workflow (best-effort; manual cleanup expected) ## Estimate: ~2 weeks (unchanged — tracing module is ~500 lines of straightforward Python)
Author
Owner

Clarification on the tracing model (reflected in hero_rpc#30 after a design-review pushback):

  • Generated Python clients are pure RPC clients — no tracing code, works standalone.
  • The tracing helpers (@flow, flow.step, flow.Failed, instrument(), socket plumbing) live in the shared hero_tracing.py module that hero_rpc emits alongside each service's client.
  • Flow authors opt into per-RPC spans per-client via wb = instrument(HeroWhiteboardClient()). Unwrapped clients don't emit RPC spans.
  • This story's schema + executor scope is unchanged — we still add Span + Play.spans, still listen on HERO_FLOW_SPAN_SOCK, still collect events. What changes is where those events come from: flow.step always, instrument()-wrapped clients conditionally, unwrapped clients never.

Test fixture for the story's done-criterion updates accordingly: the sample flow should exercise all three tiers — some clients instrumented, some not — and produce the correct mix of step spans and rpc spans.

**Clarification on the tracing model** (reflected in hero_rpc#30 after a design-review pushback): - Generated Python clients are **pure RPC clients** — no tracing code, works standalone. - The tracing helpers (`@flow`, `flow.step`, `flow.Failed`, `instrument()`, socket plumbing) live in the shared `hero_tracing.py` module that hero_rpc emits alongside each service's client. - Flow authors opt into per-RPC spans **per-client** via `wb = instrument(HeroWhiteboardClient())`. Unwrapped clients don't emit RPC spans. - **This story's schema + executor scope is unchanged** — we still add `Span` + `Play.spans`, still listen on `HERO_FLOW_SPAN_SOCK`, still collect events. What changes is where those events come from: `flow.step` always, `instrument()`-wrapped clients conditionally, unwrapped clients never. Test fixture for the story's done-criterion updates accordingly: the sample flow should exercise all three tiers — some clients instrumented, some not — and produce the correct mix of step spans and rpc spans.
Author
Owner

Schema addendum — enriched Span fields

Story 2's viewer (#12) is being scoped as a graph-style rendering (Cytoscape, same visual richness as the old DAG editor — node cards with inputs/outputs/function-sig, edges showing parent-child + fan-out + sequential, drawer for details). To feed that viewer, the Span type needs fields beyond the minimal set in the issue body.

Add to Span:

Span = {
    span_id: str
    parent_id: str
    name: str
    start_ms: u64
    end_ms: u64
    status: SpanStatus
    tags: str               # JSON-serialized key-values (existing)
    logs: str               # captured stdout/stderr (existing)
    error: str              # exception + traceback (existing)
    rpc_service: str
    rpc_method: str
    child_play_sid: str

    # NEW — feed the graph-style viewer
    args: str               # JSON-serialized input to the span.
                            #   For `with flow.step("x")`: the **tags dict at span open** (from flow.step(name, **tags))
                            #   For rpc:* spans (instrument()-wrapped): the request body / method args
                            #   For @flow root: the declared inputs dict
    result: str             # JSON-serialized output.
                            #   For `with flow.step`: tags recorded via flow.current_span.record(**) at close
                            #   For rpc:* spans: the response body
                            #   For @flow root: the function's return value
    source_file: str        # the .py file that opened this span (basename is enough)
    source_line: u32        # line number where `with flow.step(...)` or the wrapped method-call appeared
                            #   Captured via inspect.stack() in hero_tracing.py; zero when unavailable.
    kind: SpanKind          # enum: flow_root | step | rpc | subflow | other — viewer picks card style by kind
}

SpanKind = enum { flow_root, step, rpc, subflow, other }

Why:

  • args + result let the viewer render input/output previews directly on the card, not just hidden behind a drawer.
  • source_file + source_line let clicking a node in the graph jump to the code that opened it — a critical UX for Story 2's split view.
  • kind lets the viewer style cards distinctly (flow roots get a big prominent card; step cards look different from rpc cards; subflow cards show a "nested play" affordance).

Size concerns: args and result can be large. hero_tracing.py should truncate them with a reasonable cap (suggest 8 KB each; configurable via env var). Full bodies are still in the logs stream if users need them.

Hero_tracing.py contract update:

  • flow.step(name, **tags)tags is the args payload at span open
  • flow.current_span.record(**kv) — records result payload (overwrites if called multiple times)
  • instrument(client) proxy — captures request args (method kwargs) as the span's args, captures response as result
  • Root @flow span — args = the flow's declared inputs resolved from the input_data JSON, result = the function's return value

Done-criteria in the original issue body stay; just extend the test-fixture flow to confirm the enriched fields land correctly.

## Schema addendum — enriched Span fields Story 2's viewer (#12) is being scoped as a graph-style rendering (Cytoscape, same visual richness as the old DAG editor — node cards with inputs/outputs/function-sig, edges showing parent-child + fan-out + sequential, drawer for details). To feed that viewer, the `Span` type needs fields beyond the minimal set in the issue body. **Add to `Span`:** ``` Span = { span_id: str parent_id: str name: str start_ms: u64 end_ms: u64 status: SpanStatus tags: str # JSON-serialized key-values (existing) logs: str # captured stdout/stderr (existing) error: str # exception + traceback (existing) rpc_service: str rpc_method: str child_play_sid: str # NEW — feed the graph-style viewer args: str # JSON-serialized input to the span. # For `with flow.step("x")`: the **tags dict at span open** (from flow.step(name, **tags)) # For rpc:* spans (instrument()-wrapped): the request body / method args # For @flow root: the declared inputs dict result: str # JSON-serialized output. # For `with flow.step`: tags recorded via flow.current_span.record(**) at close # For rpc:* spans: the response body # For @flow root: the function's return value source_file: str # the .py file that opened this span (basename is enough) source_line: u32 # line number where `with flow.step(...)` or the wrapped method-call appeared # Captured via inspect.stack() in hero_tracing.py; zero when unavailable. kind: SpanKind # enum: flow_root | step | rpc | subflow | other — viewer picks card style by kind } SpanKind = enum { flow_root, step, rpc, subflow, other } ``` **Why:** - `args` + `result` let the viewer render input/output previews directly on the card, not just hidden behind a drawer. - `source_file` + `source_line` let clicking a node in the graph jump to the code that opened it — a critical UX for Story 2's split view. - `kind` lets the viewer style cards distinctly (flow roots get a big prominent card; step cards look different from rpc cards; subflow cards show a "nested play" affordance). **Size concerns:** `args` and `result` can be large. hero_tracing.py should truncate them with a reasonable cap (suggest 8 KB each; configurable via env var). Full bodies are still in the logs stream if users need them. **Hero_tracing.py contract update:** - `flow.step(name, **tags)` — `tags` is the args payload at span open - `flow.current_span.record(**kv)` — records result payload (overwrites if called multiple times) - `instrument(client)` proxy — captures request args (method kwargs) as the span's `args`, captures response as `result` - Root `@flow` span — args = the flow's declared inputs resolved from the input_data JSON, result = the function's return value Done-criteria in the original issue body stay; just extend the test-fixture flow to confirm the enriched fields land correctly.
Sign in to join this conversation.
No labels
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_logic#11
No description provided.