Handling Slow Consumers with SSE Backpressure Permalink to this section
Part of Rate Limiting & Backpressure Handling.
A slow SSE consumer is a connected client whose TCP receive window fills faster than the client drains it. The server’s write call returns false (Node.js) or blocks (Go), queued frames accumulate in the kernel socket buffer and — if unchecked — in your own application memory. Left alone, a handful of laggard clients can exhaust heap on a process serving thousands of healthy connections.
This guide covers the full lifecycle: recognising the drain signal in Node.js and http.Flusher pressure in Go, attaching per-connection bounded queues, choosing a drop or coalesce policy that matches your event semantics, and finally hard-disconnecting clients that never recover.
Symptom & Developer Intent Permalink to this section
The visible symptoms arrive in three forms:
- Node.js
res.write()returnsfalse— the underlying socket buffer is full. If you keep writing without waiting for'drain', events silently queue in the writable stream’s internal buffer until the process OOMs. - Go
flusher.Flush()blocks or your goroutine stack grows — you are writing into ahttp.ResponseWriterwhose underlyingnet.Connhas a full send buffer. The goroutine stalls, goroutine count climbs. - Heap graphs show steady growth correlated with connection age — not a leak in the classic sense, but per-connection event queues growing without bound.
Developer intent: write to each SSE connection at the producer’s rate, without blocking the producer and without allowing any single slow client to consume unbounded memory.
Root Cause Analysis Permalink to this section
TCP Flow Control Permalink to this section
HTTP/1.1 SSE rides a single persistent TCP connection. The kernel maintains a send buffer (default ~128 KB on Linux, tunable via net.ipv4.tcp_wmem). When the remote receive window is full — because the client is paused, on a slow network, or has a frozen JS event loop — write(2) returns EAGAIN. The runtime translates that into a high-level back-signal:
| Runtime | Back-signal | What happens if ignored |
|---|---|---|
Node.js (stream.Writable) |
write() returns false; 'drain' event fires when buffer clears |
Internal write queue grows in heap |
Go (net/http) |
w.Write() blocks inside flusher.Flush() |
Goroutine stack pinned; connection goroutine count rises |
| Python asyncio | StreamWriter.write() + drain() raises BrokenPipeError or stalls |
Coroutine awaits forever without a timeout |
Why SSE is especially susceptible Permalink to this section
Unlike a WebSocket server where you control framing and can detect read-side pressure, SSE is write-only. The server has no application-level signal from the client that it has processed an event. The only feedback is the TCP write pressure itself. Additionally, SSE clients reconnect automatically on disconnect, so dropping a connection is a recoverable operation — making disconnection a viable backpressure strategy.
Step-by-Step Resolution Permalink to this section
Step 1 — Detect the drain event in Node.js Permalink to this section
Wrap every SSE write in a check of the return value. When false is returned, stop producing until 'drain' fires.
// sse-server.js (Node.js, no framework)
import http from 'node:http';
const clients = new Map(); // id → { res, draining: boolean }
http.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // disable nginx buffering
});
res.write(':ok\n\n'); // initial comment keeps proxy alive
const id = crypto.randomUUID();
const client = { res, draining: false, queue: [] };
clients.set(id, client);
res.on('drain', () => {
client.draining = false;
// flush queued events now that the socket is writable
flushQueue(client);
});
req.on('close', () => clients.delete(id));
}).listen(3000);
function flushQueue(client) {
while (client.queue.length > 0 && !client.draining) {
const frame = client.queue.shift();
const ok = client.res.write(frame);
if (!ok) {
client.draining = true; // stop until 'drain'
break;
}
}
}
function broadcast(eventData) {
const frame = `data: ${JSON.stringify(eventData)}\n\n`;
for (const client of clients.values()) {
client.queue.push(frame);
if (!client.draining) flushQueue(client);
}
}
Step 2 — Enforce a bounded per-client queue Permalink to this section
An unbounded client.queue is still a memory bomb — just a slower one. Cap the queue and apply a policy when it overflows.
const MAX_QUEUE = 128; // max frames buffered per client
function enqueue(client, frame, policy = 'drop-oldest') {
if (client.queue.length >= MAX_QUEUE) {
if (policy === 'drop-oldest') {
client.queue.shift(); // discard oldest undelivered event
} else if (policy === 'drop-newest') {
return; // discard the incoming event
} else if (policy === 'coalesce') {
// replace tail with a synthetic summary event
client.queue[client.queue.length - 1] =
`data: {"type":"coalesced","dropped":true}\n\n`;
return;
}
}
client.queue.push(frame);
if (!client.draining) flushQueue(client);
}
| Policy | Use case | Risk |
|---|---|---|
drop-oldest |
Live dashboards (newest value is truth) | Client misses intermediate states |
drop-newest |
Audit logs, financial ticks (must not lose early events) | Client falls further behind |
coalesce |
UI progress bars, count aggregations | Requires client-side handling of synthetic events |
disconnect |
Correctness-critical streams | Client reconnects and replays via Last-Event-ID |
Step 3 — Disconnect laggard clients that never recover Permalink to this section
If a client’s queue stays at MAX_QUEUE for longer than a threshold, the connection is unlikely to recover. Terminate it so the client reconnects fresh (possibly to a different pod after scaling SSE across multiple nodes with Redis).
const LAGGARD_MS = 10_000; // 10 s at full queue → disconnect
function trackLaggard(client, id) {
if (client.queue.length >= MAX_QUEUE) {
client.laggardSince ??= Date.now();
if (Date.now() - client.laggardSince > LAGGARD_MS) {
console.warn(`[sse] disconnecting laggard client ${id}`);
client.res.end(); // triggers req 'close' → cleanup
}
} else {
client.laggardSince = undefined;
}
}
// call inside broadcast(), after enqueue()
setInterval(() => {
for (const [id, client] of clients.entries()) {
trackLaggard(client, id);
}
}, 2000);
Step 4 — Handle backpressure in Go with a channel-based queue Permalink to this section
Go’s net/http handler runs one goroutine per connection. Blocking that goroutine on a slow write pins the goroutine. Instead, accept events into a buffered channel and detect a full channel as the backpressure signal.
// sse_handler.go
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const maxQueue = 128
const laggardTimeout = 10 * time.Second
type sseClient struct {
events chan string
done chan struct{}
}
func NewSSEHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("X-Accel-Buffering", "no")
client := &sseClient{
events: make(chan string, maxQueue),
done: make(chan struct{}),
}
// register client (omit broker code for brevity)
defer close(client.done)
deadline := time.NewTimer(laggardTimeout)
defer deadline.Stop()
for {
select {
case <-r.Context().Done():
return // client disconnected
case frame, ok := <-client.events:
if !ok {
return
}
deadline.Reset(laggardTimeout)
fmt.Fprint(w, frame)
flusher.Flush()
case <-deadline.C:
log.Printf("sse: laggard client disconnected")
return // close the response
}
}
}
}
// Broker enqueue — non-blocking send with drop policy
func (b *Broker) send(client *sseClient, frame string) {
select {
case client.events <- frame:
// delivered
default:
// queue full: drop-oldest by draining one slot first
select {
case <-client.events:
default:
}
client.events <- frame
}
}
Full broker wiring is covered in Implementing SSE with Go Channels and http.Flusher.
Step 5 — Add a retry directive so disconnected clients replay missed events Permalink to this section
When you disconnect a laggard, the EventSource retry mechanism fires automatically. Pair it with idempotent event IDs so the client can resume without duplicates.
// emit with ID so Last-Event-ID works on reconnect
function buildFrame(id, data) {
return `id: ${id}\nretry: 3000\ndata: ${JSON.stringify(data)}\n\n`;
}
On reconnect, read req.headers['last-event-id'] and replay from your event store — or simply let the client receive the current state as a fresh snapshot.
Validation & Monitoring Permalink to this section
Verify backpressure detection with tc netem Permalink to this section
Simulate a slow network on loopback to trigger drain in your test environment:
# add 500 ms delay + 10 % packet loss to loopback (Linux)
sudo tc qdisc add dev lo root netem delay 500ms loss 10%
# open an SSE stream; watch for drain events in server logs
curl -N http://localhost:3000/events &
# remove when done
sudo tc qdisc del dev lo root
Unit-test queue overflow in Node.js Permalink to this section
// queue.test.js (Node.js built-in test runner)
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
describe('SSE bounded queue', () => {
it('drops oldest when queue is full', () => {
const queue = [];
const MAX = 3;
function enqueue(frame) {
if (queue.length >= MAX) queue.shift(); // drop-oldest
queue.push(frame);
}
for (let i = 0; i < 5; i++) enqueue(`event-${i}`);
assert.equal(queue.length, 3);
assert.equal(queue[0], 'event-2'); // oldest 0,1 dropped
});
});
Prometheus metrics to expose Permalink to this section
// metrics.js (prom-client snippet)
import { Gauge, Counter } from 'prom-client';
export const queueDepth = new Gauge({
name: 'sse_client_queue_depth',
help: 'Current event queue depth per client (sampled)',
labelNames: ['client_id'],
});
export const droppedEvents = new Counter({
name: 'sse_dropped_events_total',
help: 'Events dropped due to full client queue',
labelNames: ['policy'],
});
export const laggardDisconnects = new Counter({
name: 'sse_laggard_disconnects_total',
help: 'Clients disconnected for being laggards',
});
Alert when sse_dropped_events_total exceeds your SLA threshold or sse_client_queue_depth p99 approaches MAX_QUEUE.
Verification Checklist Permalink to this section
⚡ Production Directives
- Cap per-client queues at 64–256 events; never use an unbounded array or channel.
- Choose drop-oldest for live data (metrics, dashboards) and disconnect for correctness-critical streams where clients must replay from
Last-Event-ID. - Set a laggard timeout of 5–30 s and disconnect; rely on EventSource auto-retry rather than holding the connection forever.
- Emit
X-Accel-Buffering: noon every SSE response — nginx and Cloudflare buffering hides TCP back-pressure and inflates your perceived queue size. - Alert on
sse_dropped_events_totalrate; a spike indicates either a slow CDN edge node or a client bug in event processing.
Frequently Asked Questions Permalink to this section
Will disconnecting a slow client cause it to miss events permanently?
Only if you have no replay mechanism. Pair disconnection with id: fields on every frame and store recent events server-side (Redis list, Postgres, or an in-memory ring buffer). On reconnect the client sends Last-Event-ID and you replay from that point. This is exactly the flow described in the Event ID & Retry Mechanism Design guide.
How do I choose MAX_QUEUE?
Multiply your target maximum per-client memory budget by the inverse of your average frame size. For example, if you want at most 512 KB per client and frames average 256 bytes, MAX_QUEUE = 2048. In practice 64–256 is a sensible default for most dashboards. Monitor sse_client_queue_depth p99 in production and raise or lower accordingly.
Does HTTP/2 multiplexing change this analysis?
Partially. HTTP/2 adds per-stream flow-control windows on top of TCP flow control, so you get two layers of back-pressure. The application-level queue strategy is identical, but you must also respect the WINDOW_UPDATE frame signal. Most Go and Node.js HTTP/2 libraries expose the same high-level write-returns-false / drain pattern, so the code above still applies.
Can I use Node.js streams pipeline() instead of manual drain handling?
Yes, if your event source is itself a Readable stream. stream.pipeline(readable, res, callback) handles back-pressure automatically. The challenge is that most SSE brokers are push-based (events arrive via Redis pub/sub or an event emitter), not pull-based streams, so you still need the manual queue approach unless you wrap the broker in a Readable with _read().
What is the right behaviour when the queue hits MAX_QUEUE and we choose coalesce?
Replace the tail of the queue with a synthetic event such as {"type":"coalesced","count":N} and update a dropped-count field. The client must handle this event type by re-fetching the full current state (an HTTP GET to a REST snapshot endpoint). This approach is common in live scoreboard and stock-ticker UIs where the client only needs the latest value, not the sequence.