Applying Token-Bucket Rate Limiting to Event Streams Permalink to this section
Part of Rate Limiting & Backpressure Handling.
Unbounded SSE producers can saturate TCP send buffers faster than clients consume them. The browser’s EventSource object receives an abrupt stream termination, logs net::ERR_CONNECTION_RESET or net::ERR_HTTP2_STREAM_ERROR, and restarts from Last-Event-ID — creating reconnection storms that compound the overload. A token-bucket algorithm paces event emission per stream (or per IP) to a configurable sustained rate with a defined burst ceiling, absorbing producer spikes without dropping the connection.
Symptom and Developer Intent Permalink to this section
Clients report one or more of the following during load spikes or high-frequency producer scenarios:
net::ERR_CONNECTION_RESETin Chrome DevTools Network tab (filterevent-stream)net::ERR_HTTP2_STREAM_ERRORon HTTP/2 endpoints behind reverse proxies- Node.js server logs show
Error: write after endorECONNRESET EventSource.readyStateoscillates between1(OPEN) and0(CONNECTING) under load- Gaps in
lastEventIdsequence indicating silent drops before the OS-level disconnect
The intent is to cap how many SSE events a server writes per unit time — per connection, per authenticated user, or per originating IP — without closing the stream or losing high-priority state updates.
Root Cause Analysis Permalink to this section
SSE streams are persistent HTTP responses. The server calls res.write() (Node.js), w.Write() + w.(http.Flusher).Flush() (Go), or StreamingResponse yields (Python/FastAPI) in a tight loop driven by an upstream producer. When the producer emits faster than the client’s TCP receive window drains, the kernel’s socket send buffer fills. res.write() returns false in Node.js (backpressure signal) but most naive implementations ignore this and continue calling write(), queueing data in the Node.js internal stream buffer — unbounded heap growth until ENOMEM or a forced socket close.
Fixed-window counters (e.g., “100 events per second, reset at :00”) create a saw-tooth burst pattern: all 100 tokens are consumed in the first 50 ms, then the stream stalls for 950 ms, then bursts again. This violates SSE’s continuous delivery model and triggers intermediate proxy read timeouts (Nginx proxy_read_timeout, Cloudflare 100-second idle limit). The token-bucket model refills continuously, smoothing throughput and keeping the connection alive even under rate enforcement.
Per-IP limiting at the connection layer (before HTTP) is necessary for public endpoints; per-stream (per authenticated session) limiting is required when a single user may open multiple tabs or parallel connections that together exhaust server capacity. See Connection Pooling for SSE Servers for complementary connection-count controls.
Step-by-Step Resolution Permalink to this section
Step 1 — Implement the Core Token-Bucket Class Permalink to this section
Each bucket tracks tokens (current allowance), capacity (maximum burst), refillRate (tokens per second), and a lastRefill timestamp. Refilling is lazy: it happens on consume(), not on a timer, so there is no background thread or setInterval cost.
// token-bucket.js
class TokenBucket {
/**
* @param {number} capacity – maximum burst (tokens that can accumulate)
* @param {number} refillRate – tokens added per second (sustained throughput)
*/
constructor(capacity, refillRate) {
this.capacity = capacity;
this.tokens = capacity; // start full so the first burst is allowed
this.refillRate = refillRate;
this.lastRefill = Date.now();
}
/** Returns true and consumes 1 token if available; false if the bucket is empty. */
consume() {
this._refill();
if (this.tokens >= 1) {
this.tokens -= 1;
return true;
}
return false;
}
/** Returns milliseconds until at least 1 token is available (for callers that want to wait). */
msUntilToken() {
this._refill();
if (this.tokens >= 1) return 0;
return Math.ceil((1 - this.tokens) / this.refillRate * 1000);
}
_refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000; // seconds
this.tokens = Math.min(this.capacity, this.tokens + elapsed * this.refillRate);
this.lastRefill = now;
}
}
module.exports = { TokenBucket };
Step 2 — Build a Priority-Aware SSE Emitter Permalink to this section
Wrap res.write() with the bucket check. When the bucket is empty, buffer high-priority events in a capped queue; discard low-priority telemetry to avoid unbounded memory growth. Flush the queue on the Node.js drain event, which fires when the kernel’s socket send buffer has room again.
// sse-emitter.js
const { TokenBucket } = require('./token-bucket');
const HIGH_PRIORITY_TYPES = new Set(['state_update', 'auth', 'error', 'transaction']);
const MAX_QUEUE_DEPTH = 200; // events; tune based on max acceptable latency
function createSSEEmitter(res, bucket) {
const queue = [];
let draining = false;
function writeEvent(event) {
// SSE wire format: named event + JSON payload
const type = event.type ? `event: ${event.type}\n` : '';
const id = event.id ? `id: ${event.id}\n` : '';
return res.write(`${id}${type}data: ${JSON.stringify(event.data)}\n\n`);
}
function flushQueue() {
if (draining) return;
draining = true;
while (queue.length > 0 && bucket.consume()) {
const next = queue.shift();
const ok = writeEvent(next);
if (!ok) {
// TCP backpressure again: stop, wait for next 'drain'
queue.unshift(next); // put it back
draining = false;
return;
}
}
draining = false;
}
res.on('drain', flushQueue);
return function emit(event) {
if (bucket.consume()) {
const ok = writeEvent(event);
if (!ok && HIGH_PRIORITY_TYPES.has(event.type) && queue.length < MAX_QUEUE_DEPTH) {
queue.unshift(event); // TCP stall: re-queue for drain
}
} else {
// Bucket empty: apply priority filter before buffering
if (HIGH_PRIORITY_TYPES.has(event.type) && queue.length < MAX_QUEUE_DEPTH) {
queue.push(event);
}
// Low-priority events (telemetry, heartbeats) are silently dropped
}
};
}
module.exports = { createSSEEmitter };
Step 3 — Wire Per-Stream and Per-IP Buckets into the HTTP Handler Permalink to this section
Use two buckets: one per-IP (shared across all connections from that address) and one per-stream (this connection). Both must grant a token for an event to be emitted. Store IP buckets in a Map with a TTL cleanup to prevent memory leaks from abandoned clients.
// server.js
const http = require('http');
const { TokenBucket } = require('./token-bucket');
const { createSSEEmitter } = require('./sse-emitter');
// Per-IP buckets: 30 events/sec sustained, 60 burst
const ipBuckets = new Map();
const IP_RATE = 30;
const IP_BURST = 60;
function getIpBucket(ip) {
if (!ipBuckets.has(ip)) {
ipBuckets.set(ip, { bucket: new TokenBucket(IP_BURST, IP_RATE), lastSeen: Date.now() });
// Evict stale entries every minute
if (ipBuckets.size === 1) {
setInterval(() => {
const cutoff = Date.now() - 120_000;
for (const [k, v] of ipBuckets) {
if (v.lastSeen < cutoff) ipBuckets.delete(k);
}
}, 60_000).unref();
}
}
const entry = ipBuckets.get(ip);
entry.lastSeen = Date.now();
return entry.bucket;
}
http.createServer((req, res) => {
if (req.url !== '/stream') return res.end();
const ip = req.socket.remoteAddress;
const ipBucket = getIpBucket(ip);
const streamBucket = new TokenBucket(10, 5); // 5 events/sec per stream, 10 burst
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // prevent Nginx from buffering the response
});
res.flushHeaders(); // send headers immediately so the client enters OPEN state
// Combined bucket: both IP and stream must have tokens
const combinedBucket = {
consume() {
// consume from stream bucket first (cheaper check)
if (!streamBucket.consume()) return false;
if (!ipBucket.consume()) return false;
return true;
}
};
const emit = createSSEEmitter(res, combinedBucket);
// Heartbeat every 25 s to prevent proxy idle timeout
const heartbeat = setInterval(() => {
res.write(': heartbeat\n\n'); // SSE comment; does not trigger onmessage
}, 25_000);
// Simulate an upstream producer at 20 events/sec
let seq = 0;
const producer = setInterval(() => {
emit({ type: 'update', id: String(++seq), data: { ts: Date.now() } });
}, 50);
req.on('close', () => {
clearInterval(heartbeat);
clearInterval(producer);
});
}).listen(3000, () => console.log('SSE server on :3000'));
Step 4 — Apply Rate Limiting in Python (FastAPI / sse-starlette) Permalink to this section
The same algorithm translates directly to Python for FastAPI services. The generator yields events only when a token is available; it await asyncio.sleep() for the refill period to avoid busy-waiting.
# rate_limited_sse.py
import asyncio, time, json
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = float(capacity)
self.refill_rate = refill_rate # tokens/second
self.last_refill = time.monotonic()
def consume(self) -> bool:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
def ms_until_token(self) -> float:
self._refill()
if self.tokens >= 1:
return 0.0
return (1 - self.tokens) / self.refill_rate # seconds
def _refill(self):
now = time.monotonic()
self.tokens = min(self.capacity, self.tokens + (now - self.last_refill) * self.refill_rate)
self.last_refill = now
async def event_generator(request: Request):
bucket = TokenBucket(capacity=10, refill_rate=5) # 5 events/sec, burst 10
seq = 0
while True:
if await request.is_disconnected():
break
if bucket.consume():
seq += 1
yield {"event": "update", "id": str(seq), "data": json.dumps({"ts": time.time()})}
else:
wait = bucket.ms_until_token()
await asyncio.sleep(wait) # yield control; do not busy-spin
@app.get("/stream")
async def stream(request: Request):
return EventSourceResponse(event_generator(request))
Step 5 — Configure Nginx to Disable Response Buffering Permalink to this section
Without this, Nginx accumulates events in its proxy buffer before forwarding, making the token bucket’s pacing invisible to the client and causing burst delivery anyway.
# nginx.conf (location block)
location /stream {
proxy_pass http://backend:3000;
proxy_buffering off; # critical: do not buffer SSE responses
proxy_cache off;
proxy_read_timeout 3600s; # hold connection open for up to 1 hour
proxy_set_header Connection '';
proxy_http_version 1.1;
add_header X-Accel-Buffering no;
}
Validation and Monitoring Permalink to this section
Verify the token bucket is working at the wire level, not just in unit tests.
curl smoke test — confirm pacing:
# Should print one event roughly every 200ms (5 events/sec) after the initial burst
curl -N http://localhost:3000/stream | ts '%H:%M:%.S'
Load test with k6 — confirm per-IP enforcement:
// k6-sse-ratelimit.js
import http from 'k6/http';
export const options = { vus: 50, duration: '30s' };
export default function () {
const res = http.get('http://localhost:3000/stream', {
headers: { Accept: 'text/event-stream' },
timeout: '10s',
});
// Connections should stay open; watch for non-200 status (429 = rate limited at IP level)
if (res.status !== 200) console.log(`status=${res.status}`);
}
Metrics to instrument (Prometheus-style labels):
| Metric | Description | Alert Threshold |
|---|---|---|
sse_tokens_consumed_total |
Cumulative tokens consumed per stream | N/A (counter) |
sse_queue_depth |
Events waiting in the priority queue | > 100 sustained |
sse_events_dropped_total |
Low-priority events discarded | > 0.5% of produced |
sse_bucket_empty_total |
Times consume() returned false |
Spikes indicate overload |
sse_reconnect_rate |
EventSource reconnect events per minute |
> 5/min per client |
Expose these via a /metrics endpoint and alert on sse_queue_depth and sse_events_dropped_total. Pair with Idempotent Event ID Generation so clients can detect gaps during reconnection and request replay from the Last-Event-ID.
Bucket tuning reference:
| Use case | capacity |
refillRate |
Notes |
|---|---|---|---|
| Real-time dashboard (low latency) | 5 | 10 | Small burst; 10 ev/s sustained |
| High-frequency telemetry | 50 | 25 | Absorbs upstream spikes |
| Chat / notifications | 3 | 2 | Low volume; generous burst |
| Per-IP shared limit (public API) | 60 | 30 | Covers ~3 tabs per user |
Always send a retry: field in the first event so the browser knows the reconnect delay when the rate limiter causes a stream close due to sustained overload. For Event ID & Retry Mechanism Design details, see the dedicated guide.
⚡ Production Directives
- Set
X-Accel-Buffering: noon every SSE response and mirror it in your Nginxproxy_buffering offdirective — buffering at the proxy defeats all pacing logic. - Use two independent buckets per request: one per authenticated session and one per IP, requiring both to grant a token before each
res.write(). - Cap the priority queue at a fixed depth (200 events is a reasonable ceiling) and drop low-priority messages when the queue is full — do not grow the queue unboundedly.
- Never use
setIntervalto refill tokens; refill lazily inconsume()using elapsed-time arithmetic to avoid timer drift and unnecessary CPU wakeups. - Instrument
sse_queue_depthandsse_events_dropped_totaland alert before the queue saturates — by the time clients disconnect, you are already in a degraded state.
Verification Checklist Permalink to this section
Frequently Asked Questions Permalink to this section
Why not just use a sliding-window counter instead of a token bucket?
A sliding-window counter counts events in a rolling time window and rejects or delays once the count is exceeded. It cannot absorb burst-then-idle patterns: a producer that fires 10 events in 10 ms will hit the limit even if the previous 990 ms were idle. The token bucket accumulates credit during idle periods (up to capacity), allowing that burst while still enforcing the sustained average. For continuous streams where events are naturally bursty, the token bucket produces smoother wire throughput and fewer false-positive rejections.
Should I return HTTP 429 or just drop events when the bucket is empty?
Do not close the SSE connection with a 429 — the browser's EventSource will immediately reconnect, creating a reconnection storm that amplifies the load. Instead, silently drop low-priority events and buffer high-priority ones until the bucket refills. Reserve HTTP 429 for the initial connection request (before the stream opens) if the IP has exceeded its connection-establishment rate. Once the stream is open, manage rate limiting by pacing writes.
How do I share token buckets across multiple Node.js worker processes?
An in-process Map works only in a single-process server. For multi-process or multi-instance deployments, store the bucket state in Redis using a Lua script that atomically checks and decrements a counter with a sliding expiry. The Redis Pub/Sub Fan-Out for SSE guide covers the Redis connection model you will need. A typical Lua script: local tokens = redis.call('GET', key); if tokens and tonumber(tokens) >= 1 then redis.call('DECR', key); return 1 end; return 0, combined with a periodic INCRBY on a TTL key to model refill.
Does the token bucket work with HTTP/2 multiplexed streams?
Yes, but the unit of rate limiting changes. Under HTTP/2, all SSE streams from a browser share one TCP connection. Per-stream token buckets still pace writes at the application layer (one bucket per response stream), but the kernel-level TCP send buffer is shared. If multiple streams on the same connection burst simultaneously, you may still saturate the shared TCP connection. Consider adding a per-connection (per-HTTP/2-session) aggregate bucket on top of the per-stream buckets to cap total throughput across all multiplexed streams for one client.