SSE broadcast capacity of 64 can silently starve slow dashboard clients #51

Open
opened 2026-04-22 14:56:02 +00:00 by rawdaGastan · 4 comments
Member

Category: resource / low

What

SseBroadcaster wraps tokio::sync::broadcast with a fixed capacity, instantiated as SseBroadcaster::new(64) in main.rs. Subscribers that lag more than 64 messages behind the sender get RecvError::Lagged(n) on the next recv(), losing everything between. If the SSE consumer loop treats Lagged as "skip and continue", the browser silently misses dashboard updates and shows stale state.

Where

Why it's wrong

  • 64 is low relative to burst rates. One cleanup_stale_sockets round can produce ServiceRemoved + StatusChange + Scan events for many services in a few ms — well over 64 if contexts × services is non-trivial.
  • A backgrounded tab (Chrome throttles hidden tabs aggressively), a slow mobile connection, or a client behind a stalled proxy easily falls more than 64 messages behind.
  • If the stream handler consumes the receiver with a naive while let Ok(msg) = rx.recv().await it exits the stream on Lagged, which breaks the SSE connection entirely — worse than lost events, the UI stops updating until the user reloads.

Reproduction

  1. Open the dashboard. Background the tab or throttle the browser's network to "Slow 3G".
  2. Trigger a scan while many services change state at once (e.g. restart hero_proc).
  3. Return to the tab — UI shows wrong service counts / stale health until next scan or page reload.

Suggested fix

Two independent improvements, both worth doing:

1. Increase capacity. 1024 or 4096 is a reasonable default — one String per event, so bytes cost is modest. Expose via env var (e.g. ROUTER_SSE_BUFFER) so operators can tune.

2. Handle Lagged explicitly in the consumer. On RecvError::Lagged(n):

  • emit an SSE event instructing the client to re-sync by fetching the full state (e.g. call the existing /api/services JSON endpoint, or the router.list RPC) rather than continuing from the broken delta stream,
  • log a warn! with how many events were dropped.
loop {
    match rx.recv().await {
        Ok(msg) => yield Event::default().data(msg),
        Err(RecvError::Lagged(n)) => {
            tracing::warn!("SSE client lagged, dropped {n} events");
            yield Event::default().event("resync").data("");
        }
        Err(RecvError::Closed) => break,
    }
}

This keeps the connection alive and gives the client a reliable recovery path instead of silently-stale state.

## Category: resource / **low** ## What `SseBroadcaster` wraps `tokio::sync::broadcast` with a fixed capacity, instantiated as `SseBroadcaster::new(64)` in `main.rs`. Subscribers that lag more than 64 messages behind the sender get `RecvError::Lagged(n)` on the next `recv()`, losing everything between. If the SSE consumer loop treats `Lagged` as "skip and continue", the browser silently misses dashboard updates and shows stale state. ## Where - Channel definition: [`crates/hero_router/src/server/sse.rs:54-56`](src/server/sse.rs#L54) - Capacity constant: [`crates/hero_router/src/main.rs:302`](src/main.rs#L302) — `SseBroadcaster::new(64)` - Consumer: `build_ui_router` SSE stream handler in `routes.rs` (grep `subscribe()`) ## Why it's wrong - 64 is low relative to burst rates. One `cleanup_stale_sockets` round can produce `ServiceRemoved` + `StatusChange` + `Scan` events for many services in a few ms — well over 64 if contexts × services is non-trivial. - A backgrounded tab (Chrome throttles hidden tabs aggressively), a slow mobile connection, or a client behind a stalled proxy easily falls more than 64 messages behind. - If the stream handler consumes the receiver with a naive `while let Ok(msg) = rx.recv().await` it **exits** the stream on `Lagged`, which breaks the SSE connection entirely — worse than lost events, the UI stops updating until the user reloads. ## Reproduction 1. Open the dashboard. Background the tab or throttle the browser's network to "Slow 3G". 2. Trigger a scan while many services change state at once (e.g. restart `hero_proc`). 3. Return to the tab — UI shows wrong service counts / stale health until next scan or page reload. ## Suggested fix Two independent improvements, both worth doing: **1. Increase capacity.** 1024 or 4096 is a reasonable default — one `String` per event, so bytes cost is modest. Expose via env var (e.g. `ROUTER_SSE_BUFFER`) so operators can tune. **2. Handle `Lagged` explicitly in the consumer.** On `RecvError::Lagged(n)`: - emit an SSE event instructing the client to **re-sync** by fetching the full state (e.g. call the existing `/api/services` JSON endpoint, or the `router.list` RPC) rather than continuing from the broken delta stream, - log a `warn!` with how many events were dropped. ```rust loop { match rx.recv().await { Ok(msg) => yield Event::default().data(msg), Err(RecvError::Lagged(n)) => { tracing::warn!("SSE client lagged, dropped {n} events"); yield Event::default().event("resync").data(""); } Err(RecvError::Closed) => break, } } ``` This keeps the connection alive and gives the client a reliable recovery path instead of silently-stale state.
rawdaGastan added this to the ACTIVE project 2026-04-23 11:14:35 +00:00
Author
Member

Implementation Spec: Issue #51 - SSE Broadcast Capacity Starvation

Objective

Eliminate silent starvation of slow / backgrounded SSE dashboard clients in hero_router by:

  1. Raising the broadcast channel capacity (currently 64) to a safer default and exposing it as a tunable env var.
  2. Replacing the silent .ok() filtering in the SSE stream handler with explicit handling of Lagged(n) (warn + emit a resync SSE event) and channel closure (cleanly end the stream).
  3. Teaching the dashboard JavaScript to react to the new resync SSE event by triggering a full sidebar reload.

Requirements

  • New env var ROUTER_SSE_BUFFER parsed at startup; default 1024. Parsed in the same style as the existing ROUTER_PORT in crates/hero_router/src/config.rs.
  • SseBroadcaster::new(...) in main.rs must be sourced from the new config field, not a hard-coded 64.
  • The SSE stream handler in crates/hero_router/src/server/routes.rs must:
    • On Ok(msg): forward as today.
    • On BroadcastStreamRecvError::Lagged(n): emit tracing::warn! and inject the SSE frame event: resync\ndata: {"dropped": n}\n\n into the stream.
    • On channel closure: terminate the stream cleanly (the BroadcastStream wrapper already yields None in this case).
    • Continue to merge with the 15-second ping keepalive.
  • The dashboard JS at crates/hero_router/static/js/dashboard.js must register a listener for event: resync that calls the existing reloadSidebar() so the UI re-fetches authoritative state.
  • Documentation for the new env var added to the RouterConfig doc-comment table.

Files to Modify

File Change
crates/hero_router/src/config.rs Add field sse_buffer: usize to RouterConfig; default-parse from ROUTER_SSE_BUFFER (default 1024); add .sse_buffer(...) builder; update env-var doc table; extend tests.
crates/hero_router/src/main.rs Replace hard-coded SseBroadcaster::new(64) (~line 302) with SseBroadcaster::new(cfg.sse_buffer).
crates/hero_router/src/server/sse.rs Update doc comment on new to reference the env var. No behavioural change.
crates/hero_router/src/server/routes.rs Rewrite the SSE stream mapping to use explicit handling of BroadcastStreamRecvError::Lagged(n) (yield resync frame + warn!). Keep existing keepalive merge.
crates/hero_router/static/js/dashboard.js In initSSE(), add es.addEventListener('resync', reloadSidebar) next to existing event listeners, with a console warning logging the drop count.

Implementation Plan

Step 1 - Add sse_buffer to RouterConfig

File: crates/hero_router/src/config.rs

  • Add pub sse_buffer: usize field (next to probe_timeout_secs).
  • Parse in Default:
    let sse_buffer = std::env::var("ROUTER_SSE_BUFFER")
        .ok()
        .and_then(|v| v.parse().ok())
        .filter(|n: &usize| *n > 0)
        .unwrap_or(1024);
    
  • Add .sse_buffer(mut self, n: usize) -> Self builder.
  • Add ROUTER_SSE_BUFFER row to the env-var doc table at the top of the file.
  • Extend new_returns_default to assert cfg.sse_buffer == 1024.
  • Extend builder_overrides_are_applied to chain .sse_buffer(2048) and assert.

Dependencies: none.

Step 2 - Wire config into SseBroadcaster::new

File: crates/hero_router/src/main.rs

  • Replace let sse = SseBroadcaster::new(64); (~line 302) with let sse = SseBroadcaster::new(cfg.sse_buffer);.

Dependencies: Step 1.

Step 3 - Update SseBroadcaster doc comment

File: crates/hero_router/src/server/sse.rs

  • Add note to pub fn new(capacity: usize) doc-comment: "Capacity is configured via ROUTER_SSE_BUFFER (default 1024) when constructed from RouterConfig."
  • No code change.

Dependencies: none (independent of Steps 1-2).

Step 4 - Rewrite SSE handler with explicit Lagged handling

File: crates/hero_router/src/server/routes.rs

  • Add import: use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
  • Replace the broadcast stream mapping (currently uses .filter_map with .ok()) so it explicitly matches:
    let stream = BroadcastStream::new(rx).filter_map(|msg| async move {
        match msg {
            Ok(s) => Some(Ok::<_, Infallible>(s)),
            Err(BroadcastStreamRecvError::Lagged(n)) => {
                tracing::warn!("SSE client lagged, dropped {n} events; sending resync");
                Some(Ok::<_, Infallible>(format!(
                    "event: resync\ndata: {{\"dropped\":{n}}}\n\n"
                )))
            }
        }
    });
    
    Keep the keepalive block and the merged stream / response building exactly as today.

Dependencies: none (independent of Steps 1-3 in compilation, but logically belongs in the same change).

Step 5 - Add resync listener to dashboard.js

File: crates/hero_router/static/js/dashboard.js

  • Inside initSSE(), after the existing service_removed listener, add:
    es.addEventListener('resync', (ev) => {
        try {
            const data = ev.data ? JSON.parse(ev.data) : {};
            console.warn('[hero_router] SSE resync requested, dropped events:', data.dropped);
        } catch (_) { /* tolerate non-JSON */ }
        reloadSidebar();
    });
    

Dependencies: Step 4 for end-to-end value, but JS change is safe to ship independently.

Step 6 - Build & verification

  • cargo build -p hero_router
  • cargo test -p hero_router
  • Manual check with ROUTER_SSE_BUFFER=4 to force lag and confirm the resync flow.

Acceptance Criteria

  • RouterConfig::default() reads ROUTER_SSE_BUFFER, defaults to 1024, and rejects 0/non-numeric values.
  • RouterConfig has a sse_buffer(usize) builder, mirrored in unit tests.
  • crates/hero_router/src/main.rs no longer contains the literal SseBroadcaster::new(64); capacity flows from cfg.sse_buffer.
  • SSE handler no longer silently drops errors; lagged subscribers receive event: resync and the server emits a tracing::warn! recording the drop count.
  • When the broadcast channel closes, the SSE response terminates cleanly.
  • dashboard.js registers a resync listener that calls reloadSidebar.
  • cargo build -p hero_router and cargo test -p hero_router both succeed.

Notes

  • Default value (1024). One broadcast slot stores one String (typically <300 bytes), so 1024 ~ <300 KiB worst-case. 16x the current value, large enough to absorb a full cleanup_stale_sockets burst across realistic service counts.
  • BroadcastStreamRecvError. In tokio-stream 0.1 this enum currently has only the Lagged(u64) variant; closed channels yield None from the stream. The match above is exhaustive today and gives the desired "stream ends on close" behaviour without re-implementing BroadcastStream.
  • Resync payload. Small JSON literal {"dropped":n} so future client code can show a toast if desired. The current dashboard handler intentionally only logs and reloads.
  • tokio::sync::broadcast. A single shared ring buffer across subscribers, so raising capacity costs memory only once.
# Implementation Spec: Issue #51 - SSE Broadcast Capacity Starvation ## Objective Eliminate silent starvation of slow / backgrounded SSE dashboard clients in `hero_router` by: 1. Raising the broadcast channel capacity (currently `64`) to a safer default and exposing it as a tunable env var. 2. Replacing the silent `.ok()` filtering in the SSE stream handler with explicit handling of `Lagged(n)` (warn + emit a `resync` SSE event) and channel closure (cleanly end the stream). 3. Teaching the dashboard JavaScript to react to the new `resync` SSE event by triggering a full sidebar reload. ## Requirements - New env var `ROUTER_SSE_BUFFER` parsed at startup; default `1024`. Parsed in the same style as the existing `ROUTER_PORT` in `crates/hero_router/src/config.rs`. - `SseBroadcaster::new(...)` in `main.rs` must be sourced from the new config field, not a hard-coded `64`. - The SSE stream handler in `crates/hero_router/src/server/routes.rs` must: - On `Ok(msg)`: forward as today. - On `BroadcastStreamRecvError::Lagged(n)`: emit `tracing::warn!` and inject the SSE frame `event: resync\ndata: {"dropped": n}\n\n` into the stream. - On channel closure: terminate the stream cleanly (the `BroadcastStream` wrapper already yields `None` in this case). - Continue to merge with the 15-second `ping` keepalive. - The dashboard JS at `crates/hero_router/static/js/dashboard.js` must register a listener for `event: resync` that calls the existing `reloadSidebar()` so the UI re-fetches authoritative state. - Documentation for the new env var added to the `RouterConfig` doc-comment table. ## Files to Modify | File | Change | |---|---| | `crates/hero_router/src/config.rs` | Add field `sse_buffer: usize` to `RouterConfig`; default-parse from `ROUTER_SSE_BUFFER` (default `1024`); add `.sse_buffer(...)` builder; update env-var doc table; extend tests. | | `crates/hero_router/src/main.rs` | Replace hard-coded `SseBroadcaster::new(64)` (~line 302) with `SseBroadcaster::new(cfg.sse_buffer)`. | | `crates/hero_router/src/server/sse.rs` | Update doc comment on `new` to reference the env var. No behavioural change. | | `crates/hero_router/src/server/routes.rs` | Rewrite the SSE stream mapping to use explicit handling of `BroadcastStreamRecvError::Lagged(n)` (yield resync frame + `warn!`). Keep existing `keepalive` merge. | | `crates/hero_router/static/js/dashboard.js` | In `initSSE()`, add `es.addEventListener('resync', reloadSidebar)` next to existing event listeners, with a console warning logging the drop count. | ## Implementation Plan ### Step 1 - Add `sse_buffer` to `RouterConfig` **File:** `crates/hero_router/src/config.rs` - Add `pub sse_buffer: usize` field (next to `probe_timeout_secs`). - Parse in `Default`: ```rust let sse_buffer = std::env::var("ROUTER_SSE_BUFFER") .ok() .and_then(|v| v.parse().ok()) .filter(|n: &usize| *n > 0) .unwrap_or(1024); ``` - Add `.sse_buffer(mut self, n: usize) -> Self` builder. - Add `ROUTER_SSE_BUFFER` row to the env-var doc table at the top of the file. - Extend `new_returns_default` to assert `cfg.sse_buffer == 1024`. - Extend `builder_overrides_are_applied` to chain `.sse_buffer(2048)` and assert. Dependencies: none. ### Step 2 - Wire config into `SseBroadcaster::new` **File:** `crates/hero_router/src/main.rs` - Replace `let sse = SseBroadcaster::new(64);` (~line 302) with `let sse = SseBroadcaster::new(cfg.sse_buffer);`. Dependencies: Step 1. ### Step 3 - Update `SseBroadcaster` doc comment **File:** `crates/hero_router/src/server/sse.rs` - Add note to `pub fn new(capacity: usize)` doc-comment: "Capacity is configured via `ROUTER_SSE_BUFFER` (default 1024) when constructed from `RouterConfig`." - No code change. Dependencies: none (independent of Steps 1-2). ### Step 4 - Rewrite SSE handler with explicit `Lagged` handling **File:** `crates/hero_router/src/server/routes.rs` - Add import: `use tokio_stream::wrappers::errors::BroadcastStreamRecvError;` - Replace the broadcast stream mapping (currently uses `.filter_map` with `.ok()`) so it explicitly matches: ```rust let stream = BroadcastStream::new(rx).filter_map(|msg| async move { match msg { Ok(s) => Some(Ok::<_, Infallible>(s)), Err(BroadcastStreamRecvError::Lagged(n)) => { tracing::warn!("SSE client lagged, dropped {n} events; sending resync"); Some(Ok::<_, Infallible>(format!( "event: resync\ndata: {{\"dropped\":{n}}}\n\n" ))) } } }); ``` Keep the `keepalive` block and the `merged` stream / response building exactly as today. Dependencies: none (independent of Steps 1-3 in compilation, but logically belongs in the same change). ### Step 5 - Add `resync` listener to `dashboard.js` **File:** `crates/hero_router/static/js/dashboard.js` - Inside `initSSE()`, after the existing `service_removed` listener, add: ```js es.addEventListener('resync', (ev) => { try { const data = ev.data ? JSON.parse(ev.data) : {}; console.warn('[hero_router] SSE resync requested, dropped events:', data.dropped); } catch (_) { /* tolerate non-JSON */ } reloadSidebar(); }); ``` Dependencies: Step 4 for end-to-end value, but JS change is safe to ship independently. ### Step 6 - Build & verification - `cargo build -p hero_router` - `cargo test -p hero_router` - Manual check with `ROUTER_SSE_BUFFER=4` to force lag and confirm the resync flow. ## Acceptance Criteria - [ ] `RouterConfig::default()` reads `ROUTER_SSE_BUFFER`, defaults to `1024`, and rejects `0`/non-numeric values. - [ ] `RouterConfig` has a `sse_buffer(usize)` builder, mirrored in unit tests. - [ ] `crates/hero_router/src/main.rs` no longer contains the literal `SseBroadcaster::new(64)`; capacity flows from `cfg.sse_buffer`. - [ ] SSE handler no longer silently drops errors; lagged subscribers receive `event: resync` and the server emits a `tracing::warn!` recording the drop count. - [ ] When the broadcast channel closes, the SSE response terminates cleanly. - [ ] `dashboard.js` registers a `resync` listener that calls `reloadSidebar`. - [ ] `cargo build -p hero_router` and `cargo test -p hero_router` both succeed. ## Notes - **Default value (1024).** One broadcast slot stores one `String` (typically <300 bytes), so 1024 ~ <300 KiB worst-case. 16x the current value, large enough to absorb a full `cleanup_stale_sockets` burst across realistic service counts. - **`BroadcastStreamRecvError`.** In `tokio-stream 0.1` this enum currently has only the `Lagged(u64)` variant; closed channels yield `None` from the stream. The match above is exhaustive today and gives the desired "stream ends on close" behaviour without re-implementing `BroadcastStream`. - **Resync payload.** Small JSON literal `{"dropped":n}` so future client code can show a toast if desired. The current dashboard handler intentionally only logs and reloads. - **`tokio::sync::broadcast`.** A single shared ring buffer across subscribers, so raising capacity costs memory only once.
Author
Member

Test Results

cargo test -p hero_router

  • Total: 71
  • Passed: 71
  • Failed: 0
  • Ignored: 0

Updated config::tests (asserting the new sse_buffer default + builder):

  • config::tests::new_returns_default — ok (asserts cfg.sse_buffer == 1024)
  • config::tests::builder_overrides_are_applied — ok (asserts .sse_buffer(2048) round-trips)

cargo build -p hero_router — clean compile, no warnings introduced by this change.

## Test Results `cargo test -p hero_router` - Total: 71 - Passed: 71 - Failed: 0 - Ignored: 0 Updated `config::tests` (asserting the new `sse_buffer` default + builder): - `config::tests::new_returns_default` — ok (asserts `cfg.sse_buffer == 1024`) - `config::tests::builder_overrides_are_applied` — ok (asserts `.sse_buffer(2048)` round-trips) `cargo build -p hero_router` — clean compile, no warnings introduced by this change.
Author
Member

Implementation Summary

Changes

crates/hero_router/src/config.rs

  • New env var ROUTER_SSE_BUFFER documented in the module-level table.
  • New field RouterConfig.sse_buffer: usize.
  • Default::default() parses ROUTER_SSE_BUFFER, rejects 0/non-numeric, falls back to 1024.
  • New builder .sse_buffer(usize) -> Self mirroring the existing pattern.
  • new_returns_default test asserts the new default (1024).
  • builder_overrides_are_applied test chains .sse_buffer(2048) and asserts.

crates/hero_router/src/main.rs

  • Replaced the hard-coded SseBroadcaster::new(64) with SseBroadcaster::new(cfg.sse_buffer).

crates/hero_router/src/server/sse.rs

  • Extended the doc comment on SseBroadcaster::new to describe the env var, the lagged-subscriber semantics, and the resync SSE recovery path.

crates/hero_router/src/server/routes.rs

  • Added use tokio_stream::wrappers::errors::BroadcastStreamRecvError;.
  • sse_handler no longer silently drops Err(_) via .ok(). The new mapping:
    • Ok(s) -> forward.
    • Err(BroadcastStreamRecvError::Lagged(n)) -> emit tracing::warn! and yield event: resync\ndata: {"dropped":n}\n\n.
    • Closed channel still terminates the stream cleanly (BroadcastStream wrapper yields None).
  • Existing keepalive merge and response wiring unchanged.

crates/hero_router/static/js/dashboard.js

  • initSSE() now registers a resync listener that logs the drop count via console.warn and calls the existing idempotent reloadSidebar() so the UI re-fetches authoritative state.

Test Results

cargo test -p hero_router -- 71 passed, 0 failed.
cargo build -p hero_router -- clean.

Notes

  • Default capacity is 1024 (16x the previous 64); each broadcast slot holds one short SSE-frame String, so worst-case memory is well under a MiB.
  • BroadcastStreamRecvError currently has only the Lagged(u64) variant in tokio-stream 0.1; channel closure is signalled by None from the wrapper. The match is exhaustive today and a future variant would surface as a compile error -- intentional.
  • Operators can tune the buffer at startup via ROUTER_SSE_BUFFER=<n> (n > 0). Values of 0 or non-numeric input are silently rejected and the default is used.
## Implementation Summary ### Changes **`crates/hero_router/src/config.rs`** - New env var `ROUTER_SSE_BUFFER` documented in the module-level table. - New field `RouterConfig.sse_buffer: usize`. - `Default::default()` parses `ROUTER_SSE_BUFFER`, rejects `0`/non-numeric, falls back to `1024`. - New builder `.sse_buffer(usize) -> Self` mirroring the existing pattern. - `new_returns_default` test asserts the new default (`1024`). - `builder_overrides_are_applied` test chains `.sse_buffer(2048)` and asserts. **`crates/hero_router/src/main.rs`** - Replaced the hard-coded `SseBroadcaster::new(64)` with `SseBroadcaster::new(cfg.sse_buffer)`. **`crates/hero_router/src/server/sse.rs`** - Extended the doc comment on `SseBroadcaster::new` to describe the env var, the lagged-subscriber semantics, and the `resync` SSE recovery path. **`crates/hero_router/src/server/routes.rs`** - Added `use tokio_stream::wrappers::errors::BroadcastStreamRecvError;`. - `sse_handler` no longer silently drops `Err(_)` via `.ok()`. The new mapping: - `Ok(s)` -> forward. - `Err(BroadcastStreamRecvError::Lagged(n))` -> emit `tracing::warn!` and yield `event: resync\ndata: {"dropped":n}\n\n`. - Closed channel still terminates the stream cleanly (BroadcastStream wrapper yields `None`). - Existing keepalive merge and response wiring unchanged. **`crates/hero_router/static/js/dashboard.js`** - `initSSE()` now registers a `resync` listener that logs the drop count via `console.warn` and calls the existing idempotent `reloadSidebar()` so the UI re-fetches authoritative state. ### Test Results `cargo test -p hero_router` -- 71 passed, 0 failed. `cargo build -p hero_router` -- clean. ### Notes - Default capacity is `1024` (16x the previous `64`); each broadcast slot holds one short SSE-frame `String`, so worst-case memory is well under a MiB. - `BroadcastStreamRecvError` currently has only the `Lagged(u64)` variant in `tokio-stream 0.1`; channel closure is signalled by `None` from the wrapper. The `match` is exhaustive today and a future variant would surface as a compile error -- intentional. - Operators can tune the buffer at startup via `ROUTER_SSE_BUFFER=<n>` (n > 0). Values of `0` or non-numeric input are silently rejected and the default is used.
Author
Member

Pull request opened: #58

This PR implements the changes discussed in this issue.

Pull request opened: https://forge.ourworld.tf/lhumina_code/hero_router/pulls/58 This PR implements the changes discussed in this issue.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_router#51
No description provided.