LogEventBus leaks entries on executor task panic — switch to Drop-guarded RAII #123

Closed
opened 2026-05-24 10:14:08 +00:00 by sameh-farouk · 1 comment
Member

Symptom

LogEventBus (crates/hero_proc_server/src/logging/sse/mod.rs) holds one broadcast::Sender<LogEvent> per job, inserted by open(job_id) and removed by close(job_id). The supervisor's executor spawn-closure does the pairing:

let handle = tokio::spawn(async move {
    executor::run_job(...).await;
    if let Some(p) = probe_handle { p.abort(); }
    event_bus.close(job_id).await;          // ← only runs on the happy path
    active.lock().await.remove(&job_id);    // ← same
});

If executor::run_job panics — for any reason: a .unwrap() deep in the spawn chain, a stack overflow, an OOM in the log writer, an assert! in the PTY path — the spawned task terminates with JoinError::Panic and neither close() nor the active.lock().await.remove(...) lines run. The entry leaks in the LogEventBus's HashMap<u32, broadcast::Sender> AND in the supervisor's active map.

Over a long-running daemon (the kind of "several hours uptime" that #122 reports), every panic accumulates. With long enough uptime + non-trivial job churn the HashMap grows unbounded and any subscriber attaching to a leaked job_id gets a Sender whose corresponding Receiver will never see a terminator.

Why a Drop guard is the right architectural fix

The current API is "manual lifecycle pairing" — open() and close() are explicit; the caller is responsible for ensuring both run. This is a Rust footgun because:

  • Panics, early returns, and aborts skip the close
  • A second close() after the first is a silent no-op (the entry's already gone) so there's no signal when something gets dropped wrong
  • Future refactors of the supervisor spawn closure can move/skip the cleanup line without compile-time complaint

RAII via a Drop guard moves the lifecycle into the type system. Standard Rust pattern, used widely (MutexGuard, JoinHandle's drop-abort behavior, tokio::task::AbortHandle).

Proposed fix shape

Switch the internal HashMap from tokio::sync::RwLockstd::sync::RwLock. The map operations are O(1) and finish in nanoseconds; there's no reason for the async-aware lock. Doing this makes open / publish / subscribe synchronous, AND makes the Drop body trivial (no tokio::spawn from inside Drop required):

pub struct LogEventBus {
    inner: Arc<std::sync::RwLock<HashMap<u32, broadcast::Sender<LogEvent>>>>,
}

pub struct OpenChannelGuard {
    bus: LogEventBus,
    job_id: u32,
}

impl LogEventBus {
    pub fn open(&self, job_id: u32) -> OpenChannelGuard {
        let (tx, _) = broadcast::channel(CHANNEL_CAPACITY);
        self.inner.write().unwrap().insert(job_id, tx);
        OpenChannelGuard { bus: self.clone(), job_id }
    }

    pub fn publish(&self, job_id: u32, line: String, stream: &str) { /* sync read */ }
    pub fn subscribe(&self, job_id: u32) -> Option<broadcast::Receiver<LogEvent>> { /* sync read */ }
    // close() goes away — Drop handles it
}

impl Drop for OpenChannelGuard {
    fn drop(&mut self) {
        if let Some(tx) = self.bus.inner.write().unwrap().remove(&self.job_id) {
            let data = serde_json::json!({ "job_id": self.job_id }).to_string();
            let _ = tx.send(LogEvent { job_id: self.job_id, event: "log.done".into(), data });
        }
    }
}

Call-site change in supervisor/mod.rs:

let handle = tokio::spawn(async move {
    let _channel = event_bus.open(job_id);   // RAII guard, lives until task end
    executor::run_job(...).await;
    if let Some(p) = probe_handle { p.abort(); }
    // _channel drops here on success OR panic
});

Three things this gets:

  1. Leak fix: panic-safe by construction
  2. API simplification: open/publish/subscribe no longer need .await; ~6 call sites in executor.rs and web.rs drop the .await suffix
  3. Performance: synchronous HashMap access is faster than tokio's async lock (microseconds → nanoseconds per operation; matters on hot publish paths with high log rate)

Tests

  • Unit test: open(jid) returns guard; drop guard; verify entry is gone from inner and a final log.done event went to any active subscriber
  • Unit test: subscribe twice, drop guard, verify both subscribers see RecvError::Closed
  • Integration: existing functional::uc_23_26_logs_sse should keep passing

Not in scope

  • The supervisor's active map leak on panic — separate issue, same root cause family, different fix surface
  • #122 futex wedge — this fix REDUCES surface area for that bug class but doesn't claim to fix #122 itself

Will pick this up autonomously unless someone wants to take it.

## Symptom `LogEventBus` (`crates/hero_proc_server/src/logging/sse/mod.rs`) holds one `broadcast::Sender<LogEvent>` per job, inserted by `open(job_id)` and removed by `close(job_id)`. The supervisor's executor spawn-closure does the pairing: ```rust let handle = tokio::spawn(async move { executor::run_job(...).await; if let Some(p) = probe_handle { p.abort(); } event_bus.close(job_id).await; // ← only runs on the happy path active.lock().await.remove(&job_id); // ← same }); ``` If `executor::run_job` panics — for any reason: a `.unwrap()` deep in the spawn chain, a stack overflow, an OOM in the log writer, an `assert!` in the PTY path — the spawned task terminates with `JoinError::Panic` and **neither `close()` nor the `active.lock().await.remove(...)` lines run**. The entry leaks in the LogEventBus's `HashMap<u32, broadcast::Sender>` AND in the supervisor's `active` map. Over a long-running daemon (the kind of "several hours uptime" that #122 reports), every panic accumulates. With long enough uptime + non-trivial job churn the HashMap grows unbounded and any subscriber attaching to a leaked `job_id` gets a `Sender` whose corresponding `Receiver` will never see a terminator. ## Why a Drop guard is the right architectural fix The current API is "manual lifecycle pairing" — `open()` and `close()` are explicit; the *caller* is responsible for ensuring both run. This is a Rust footgun because: - Panics, early returns, and aborts skip the close - A second `close()` after the first is a silent no-op (the entry's already gone) so there's no signal when something gets dropped wrong - Future refactors of the supervisor spawn closure can move/skip the cleanup line without compile-time complaint RAII via a Drop guard moves the lifecycle into the type system. Standard Rust pattern, used widely (`MutexGuard`, `JoinHandle`'s drop-abort behavior, `tokio::task::AbortHandle`). ## Proposed fix shape Switch the internal HashMap from `tokio::sync::RwLock` → `std::sync::RwLock`. The map operations are O(1) and finish in nanoseconds; there's no reason for the async-aware lock. Doing this makes `open` / `publish` / `subscribe` synchronous, AND makes the Drop body trivial (no `tokio::spawn` from inside `Drop` required): ```rust pub struct LogEventBus { inner: Arc<std::sync::RwLock<HashMap<u32, broadcast::Sender<LogEvent>>>>, } pub struct OpenChannelGuard { bus: LogEventBus, job_id: u32, } impl LogEventBus { pub fn open(&self, job_id: u32) -> OpenChannelGuard { let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); self.inner.write().unwrap().insert(job_id, tx); OpenChannelGuard { bus: self.clone(), job_id } } pub fn publish(&self, job_id: u32, line: String, stream: &str) { /* sync read */ } pub fn subscribe(&self, job_id: u32) -> Option<broadcast::Receiver<LogEvent>> { /* sync read */ } // close() goes away — Drop handles it } impl Drop for OpenChannelGuard { fn drop(&mut self) { if let Some(tx) = self.bus.inner.write().unwrap().remove(&self.job_id) { let data = serde_json::json!({ "job_id": self.job_id }).to_string(); let _ = tx.send(LogEvent { job_id: self.job_id, event: "log.done".into(), data }); } } } ``` Call-site change in `supervisor/mod.rs`: ```rust let handle = tokio::spawn(async move { let _channel = event_bus.open(job_id); // RAII guard, lives until task end executor::run_job(...).await; if let Some(p) = probe_handle { p.abort(); } // _channel drops here on success OR panic }); ``` Three things this gets: 1. Leak fix: panic-safe by construction 2. API simplification: `open`/`publish`/`subscribe` no longer need `.await`; ~6 call sites in `executor.rs` and `web.rs` drop the `.await` suffix 3. Performance: synchronous HashMap access is faster than tokio's async lock (microseconds → nanoseconds per operation; matters on hot publish paths with high log rate) ## Tests - Unit test: `open(jid)` returns guard; drop guard; verify entry is gone from `inner` and a final `log.done` event went to any active subscriber - Unit test: subscribe twice, drop guard, verify both subscribers see `RecvError::Closed` - Integration: existing `functional::uc_23_26_logs_sse` should keep passing ## Not in scope - The supervisor's `active` map leak on panic — separate issue, same root cause family, different fix surface - `#122` futex wedge — this fix REDUCES surface area for that bug class but doesn't claim to fix #122 itself Will pick this up autonomously unless someone wants to take it.
Author
Member

Verified end-to-end before push. Beyond the 6 new unit tests + the existing SSE integration suite:

  • Started hero_proc_server in a fresh PATH_ROOT sandbox
  • Defined a long-running service, started it, subscribed via GET /events?job_id=<id> over SSE
  • Received real log.line events as the job ran
  • Cancelled the job mid-run via job.cancel
  • Subscriber received the terminal log.done event with {"job_id":N} payload, then the SSE stream closed cleanly
  • Re-subscribing returned no active log stream for job N — the bus entry was reclaimed

Cancel and panic both unwind the spawn closure stack and drop locals → same Drop path. The unit test panic_inside_spawned_task_still_drops_guard exercises the panic case; the manual verify above exercises the cancel case.

Wire surface (GET /events, log.line / log.done JSON payloads) unchanged — no SDK regen, no consumer rebuild.

**Verified end-to-end** before push. Beyond the 6 new unit tests + the existing SSE integration suite: - Started `hero_proc_server` in a fresh `PATH_ROOT` sandbox - Defined a long-running service, started it, subscribed via `GET /events?job_id=<id>` over SSE - Received real `log.line` events as the job ran - Cancelled the job mid-run via `job.cancel` - Subscriber received the terminal `log.done` event with `{"job_id":N}` payload, then the SSE stream closed cleanly - Re-subscribing returned `no active log stream for job N` — the bus entry was reclaimed Cancel and panic both unwind the spawn closure stack and drop locals → same Drop path. The unit test `panic_inside_spawned_task_still_drops_guard` exercises the panic case; the manual verify above exercises the cancel case. Wire surface (`GET /events`, `log.line` / `log.done` JSON payloads) unchanged — no SDK regen, no consumer rebuild.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_proc#123
No description provided.