SSE broadcast capacity of 64 can silently starve slow dashboard clients #51
Labels
No labels
prio_critical
prio_low
type_bug
type_contact
type_issue
type_lead
type_question
type_story
type_task
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
lhumina_code/hero_router#51
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Category: resource / low
What
SseBroadcasterwrapstokio::sync::broadcastwith a fixed capacity, instantiated asSseBroadcaster::new(64)inmain.rs. Subscribers that lag more than 64 messages behind the sender getRecvError::Lagged(n)on the nextrecv(), losing everything between. If the SSE consumer loop treatsLaggedas "skip and continue", the browser silently misses dashboard updates and shows stale state.Where
crates/hero_router/src/server/sse.rs:54-56crates/hero_router/src/main.rs:302—SseBroadcaster::new(64)build_ui_routerSSE stream handler inroutes.rs(grepsubscribe())Why it's wrong
cleanup_stale_socketsround can produceServiceRemoved+StatusChange+Scanevents for many services in a few ms — well over 64 if contexts × services is non-trivial.while let Ok(msg) = rx.recv().awaitit exits the stream onLagged, which breaks the SSE connection entirely — worse than lost events, the UI stops updating until the user reloads.Reproduction
hero_proc).Suggested fix
Two independent improvements, both worth doing:
1. Increase capacity. 1024 or 4096 is a reasonable default — one
Stringper event, so bytes cost is modest. Expose via env var (e.g.ROUTER_SSE_BUFFER) so operators can tune.2. Handle
Laggedexplicitly in the consumer. OnRecvError::Lagged(n):/api/servicesJSON endpoint, or therouter.listRPC) rather than continuing from the broken delta stream,warn!with how many events were dropped.This keeps the connection alive and gives the client a reliable recovery path instead of silently-stale state.
Implementation Spec: Issue #51 - SSE Broadcast Capacity Starvation
Objective
Eliminate silent starvation of slow / backgrounded SSE dashboard clients in
hero_routerby:64) to a safer default and exposing it as a tunable env var..ok()filtering in the SSE stream handler with explicit handling ofLagged(n)(warn + emit aresyncSSE event) and channel closure (cleanly end the stream).resyncSSE event by triggering a full sidebar reload.Requirements
ROUTER_SSE_BUFFERparsed at startup; default1024. Parsed in the same style as the existingROUTER_PORTincrates/hero_router/src/config.rs.SseBroadcaster::new(...)inmain.rsmust be sourced from the new config field, not a hard-coded64.crates/hero_router/src/server/routes.rsmust:Ok(msg): forward as today.BroadcastStreamRecvError::Lagged(n): emittracing::warn!and inject the SSE frameevent: resync\ndata: {"dropped": n}\n\ninto the stream.BroadcastStreamwrapper already yieldsNonein this case).pingkeepalive.crates/hero_router/static/js/dashboard.jsmust register a listener forevent: resyncthat calls the existingreloadSidebar()so the UI re-fetches authoritative state.RouterConfigdoc-comment table.Files to Modify
crates/hero_router/src/config.rssse_buffer: usizetoRouterConfig; default-parse fromROUTER_SSE_BUFFER(default1024); add.sse_buffer(...)builder; update env-var doc table; extend tests.crates/hero_router/src/main.rsSseBroadcaster::new(64)(~line 302) withSseBroadcaster::new(cfg.sse_buffer).crates/hero_router/src/server/sse.rsnewto reference the env var. No behavioural change.crates/hero_router/src/server/routes.rsBroadcastStreamRecvError::Lagged(n)(yield resync frame +warn!). Keep existingkeepalivemerge.crates/hero_router/static/js/dashboard.jsinitSSE(), addes.addEventListener('resync', reloadSidebar)next to existing event listeners, with a console warning logging the drop count.Implementation Plan
Step 1 - Add
sse_buffertoRouterConfigFile:
crates/hero_router/src/config.rspub sse_buffer: usizefield (next toprobe_timeout_secs).Default:.sse_buffer(mut self, n: usize) -> Selfbuilder.ROUTER_SSE_BUFFERrow to the env-var doc table at the top of the file.new_returns_defaultto assertcfg.sse_buffer == 1024.builder_overrides_are_appliedto chain.sse_buffer(2048)and assert.Dependencies: none.
Step 2 - Wire config into
SseBroadcaster::newFile:
crates/hero_router/src/main.rslet sse = SseBroadcaster::new(64);(~line 302) withlet sse = SseBroadcaster::new(cfg.sse_buffer);.Dependencies: Step 1.
Step 3 - Update
SseBroadcasterdoc commentFile:
crates/hero_router/src/server/sse.rspub fn new(capacity: usize)doc-comment: "Capacity is configured viaROUTER_SSE_BUFFER(default 1024) when constructed fromRouterConfig."Dependencies: none (independent of Steps 1-2).
Step 4 - Rewrite SSE handler with explicit
LaggedhandlingFile:
crates/hero_router/src/server/routes.rsuse tokio_stream::wrappers::errors::BroadcastStreamRecvError;.filter_mapwith.ok()) so it explicitly matches: Keep thekeepaliveblock and themergedstream / response building exactly as today.Dependencies: none (independent of Steps 1-3 in compilation, but logically belongs in the same change).
Step 5 - Add
resynclistener todashboard.jsFile:
crates/hero_router/static/js/dashboard.jsinitSSE(), after the existingservice_removedlistener, add:Dependencies: Step 4 for end-to-end value, but JS change is safe to ship independently.
Step 6 - Build & verification
cargo build -p hero_routercargo test -p hero_routerROUTER_SSE_BUFFER=4to force lag and confirm the resync flow.Acceptance Criteria
RouterConfig::default()readsROUTER_SSE_BUFFER, defaults to1024, and rejects0/non-numeric values.RouterConfighas asse_buffer(usize)builder, mirrored in unit tests.crates/hero_router/src/main.rsno longer contains the literalSseBroadcaster::new(64); capacity flows fromcfg.sse_buffer.event: resyncand the server emits atracing::warn!recording the drop count.dashboard.jsregisters aresynclistener that callsreloadSidebar.cargo build -p hero_routerandcargo test -p hero_routerboth succeed.Notes
String(typically <300 bytes), so 1024 ~ <300 KiB worst-case. 16x the current value, large enough to absorb a fullcleanup_stale_socketsburst across realistic service counts.BroadcastStreamRecvError. Intokio-stream 0.1this enum currently has only theLagged(u64)variant; closed channels yieldNonefrom the stream. The match above is exhaustive today and gives the desired "stream ends on close" behaviour without re-implementingBroadcastStream.{"dropped":n}so future client code can show a toast if desired. The current dashboard handler intentionally only logs and reloads.tokio::sync::broadcast. A single shared ring buffer across subscribers, so raising capacity costs memory only once.Test Results
cargo test -p hero_routerUpdated
config::tests(asserting the newsse_bufferdefault + builder):config::tests::new_returns_default— ok (assertscfg.sse_buffer == 1024)config::tests::builder_overrides_are_applied— ok (asserts.sse_buffer(2048)round-trips)cargo build -p hero_router— clean compile, no warnings introduced by this change.Implementation Summary
Changes
crates/hero_router/src/config.rsROUTER_SSE_BUFFERdocumented in the module-level table.RouterConfig.sse_buffer: usize.Default::default()parsesROUTER_SSE_BUFFER, rejects0/non-numeric, falls back to1024..sse_buffer(usize) -> Selfmirroring the existing pattern.new_returns_defaulttest asserts the new default (1024).builder_overrides_are_appliedtest chains.sse_buffer(2048)and asserts.crates/hero_router/src/main.rsSseBroadcaster::new(64)withSseBroadcaster::new(cfg.sse_buffer).crates/hero_router/src/server/sse.rsSseBroadcaster::newto describe the env var, the lagged-subscriber semantics, and theresyncSSE recovery path.crates/hero_router/src/server/routes.rsuse tokio_stream::wrappers::errors::BroadcastStreamRecvError;.sse_handlerno longer silently dropsErr(_)via.ok(). The new mapping:Ok(s)-> forward.Err(BroadcastStreamRecvError::Lagged(n))-> emittracing::warn!and yieldevent: resync\ndata: {"dropped":n}\n\n.None).crates/hero_router/static/js/dashboard.jsinitSSE()now registers aresynclistener that logs the drop count viaconsole.warnand calls the existing idempotentreloadSidebar()so the UI re-fetches authoritative state.Test Results
cargo test -p hero_router-- 71 passed, 0 failed.cargo build -p hero_router-- clean.Notes
1024(16x the previous64); each broadcast slot holds one short SSE-frameString, so worst-case memory is well under a MiB.BroadcastStreamRecvErrorcurrently has only theLagged(u64)variant intokio-stream 0.1; channel closure is signalled byNonefrom the wrapper. Thematchis exhaustive today and a future variant would surface as a compile error -- intentional.ROUTER_SSE_BUFFER=<n>(n > 0). Values of0or non-numeric input are silently rejected and the default is used.Pull request opened: #58
This PR implements the changes discussed in this issue.