Broadcasting SSE Events with Redis Pub/Sub Permalink to this section
Part of Redis Pub/Sub Fan-Out for SSE.
You have an SSE endpoint running on multiple server processes and clients connected to different nodes are not receiving events published by other nodes. The symptom is selective delivery: a client on pod A sees events, a client on pod B sees nothing when the publisher hits pod Aβs in-process event bus. Redis pub/sub solves cross-process fan-out by acting as the shared message bus, but wiring it to SSE response streams introduces two architectural decisions with real performance consequences: per-connection subscriber vs shared subscriber, and the structure of your JSON envelope.
This guide walks through both patterns end-to-end in Node.js and Python, with explicit tradeoffs so you pick the right one for your load profile.
Symptom and Developer Intent Permalink to this section
The immediate trigger is usually one of these observations:
EventSourceon the browser receives no events after deploying more than one server replica behind a load balancer.- Events fire intermittently, appearing only for clients whose request happened to land on the same node as the publisher.
- A single-node staging environment works perfectly; production with horizontal scaling breaks.
The intent: every connected SSE client must receive every event published to a given channel regardless of which server pod holds the connection. Redis pub/sub, combined with an appropriate fan-out architecture, provides exactly that guarantee.
Root Cause Analysis Permalink to this section
SSE connections are long-lived HTTP responses held in memory on one process. Each process maintains its own set of response objects (Node.js) or generator iterators (Python/FastAPI). In-process event emitters (EventEmitter, asyncio.Queue) cannot cross OS process boundaries. When you horizontally scale to N replicas, a publisher writing to process 1βs in-memory bus cannot reach clients on processes 2β¦N.
Redis pub/sub is a fire-and-forget message bus: publishers call PUBLISH channel message, and every subscriber on every node that called SUBSCRIBE channel receives the payload within the same TCP connectionβs event loop. The key insight is that the Redis subscriber lives on the server, not the browser β it receives the raw payload and immediately writes it to the SSE wire format.
The architectural choice is where the subscriber lives relative to your connections:
| Pattern | Redis connections | CPU per publish | Max connections | Reconnect cost |
|---|---|---|---|---|
| Per-connection subscriber | 1 per SSE client | O(1) per client | ~10 k (Redis default) | Low β subscribe on connect |
| Shared subscriber (fan-out in process) | 1 per channel | O(n clients on channel) | Unbounded | Medium β must re-register callback |
Per-connection works well under low connection counts (<1 k) with many channels (multi-tenant apps). Shared subscriber is mandatory above ~10 k connections because Redis enforces a maxclients limit and each idle pub/sub connection consumes ~50 KB on the Redis server.
For scaling beyond a single Redis node see Scaling SSE Across Multiple Nodes with Redis.
Step-by-Step Resolution Permalink to this section
Step 1 β Define a JSON Envelope Permalink to this section
Every message published to Redis should carry metadata the server can forward directly or transform before writing to the SSE wire.
{
"event": "price-update",
"data": { "ticker": "AAPL", "price": 182.45 },
"id": "01HZ9K4M2P8RX3Q5V7W0Y6B1C3",
"retry": 3000
}
Fields:
eventβ maps to the SSEevent:field. Omit for the default unnamed event.dataβ the payload; stringify this into one or more SSEdata:lines.idβ correlates with Idempotent Event ID Generation; clients sendLast-Event-IDon reconnect.retryβ optional; overrides the clientβs reconnect delay per the Event ID & Retry Mechanism Design spec.
Step 2 β Node.js: Shared Subscriber Pattern Permalink to this section
This implementation maintains one ioredis subscriber per channel and fans out to all connected response objects via a Map.
// fanout.js β shared Redis subscriber, Node.js 20+
import { createClient } from 'redis';
import http from 'node:http';
const publisher = createClient({ url: process.env.REDIS_URL });
const subscriber = publisher.duplicate();
await publisher.connect();
await subscriber.connect();
// Map<channel, Set<http.ServerResponse>>
const connections = new Map();
function registerClient(channel, res) {
if (!connections.has(channel)) {
connections.set(channel, new Set());
// Subscribe only when the first client joins this channel
subscriber.subscribe(channel, (message) => {
fanOut(channel, message);
});
}
connections.get(channel).add(res);
}
function unregisterClient(channel, res) {
const set = connections.get(channel);
if (!set) return;
set.delete(res);
if (set.size === 0) {
connections.delete(channel);
// Unsubscribe to release Redis resources when channel is empty
subscriber.unsubscribe(channel);
}
}
function fanOut(channel, rawMessage) {
const envelope = JSON.parse(rawMessage);
const set = connections.get(channel);
if (!set) return;
// Build the SSE frame once, reuse for all subscribers
let frame = '';
if (envelope.id) frame += `id: ${envelope.id}\n`;
if (envelope.event) frame += `event: ${envelope.event}\n`;
// Multi-line data: split on newline per the SSE spec
const dataStr = typeof envelope.data === 'object'
? JSON.stringify(envelope.data)
: String(envelope.data);
for (const line of dataStr.split('\n')) {
frame += `data: ${line}\n`;
}
if (envelope.retry) frame += `retry: ${envelope.retry}\n`;
frame += '\n'; // blank line terminates the event
for (const res of set) {
if (!res.writableEnded) {
res.write(frame);
}
}
}
// HTTP SSE endpoint
http.createServer((req, res) => {
const url = new URL(req.url, 'http://localhost');
const channel = url.searchParams.get('channel') ?? 'global';
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // disable Nginx buffering
});
res.flushHeaders();
// Send a comment heartbeat every 25 s to keep proxies alive
const hb = setInterval(() => res.write(': heartbeat\n\n'), 25_000);
registerClient(channel, res);
req.on('close', () => {
clearInterval(hb);
unregisterClient(channel, res);
});
}).listen(3000);
The X-Accel-Buffering: no header is critical β without it Nginx buffers the response body until the connection closes, and clients see no events. See Buffer Management & Chunked Transfer Encoding for the full proxy matrix.
Step 3 β Node.js: Per-Connection Subscriber Pattern Permalink to this section
Use this when each user subscribes to a personalized channel (e.g., user:{userId}:events) and you have fewer than 5 k concurrent users.
// per-connection.js
import { createClient } from 'redis';
import http from 'node:http';
http.createServer(async (req, res) => {
const url = new URL(req.url, 'http://localhost');
const userId = url.searchParams.get('userId');
if (!userId) { res.writeHead(400); res.end(); return; }
const channel = `user:${userId}:events`;
// Dedicated subscriber per connection
const sub = createClient({ url: process.env.REDIS_URL });
await sub.connect();
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.flushHeaders();
await sub.subscribe(channel, (message) => {
const envelope = JSON.parse(message);
const dataStr = typeof envelope.data === 'object'
? JSON.stringify(envelope.data)
: String(envelope.data);
let frame = '';
if (envelope.id) frame += `id: ${envelope.id}\n`;
if (envelope.event) frame += `event: ${envelope.event}\n`;
for (const line of dataStr.split('\n')) frame += `data: ${line}\n`;
frame += '\n';
if (!res.writableEnded) res.write(frame);
});
req.on('close', async () => {
await sub.unsubscribe(channel);
await sub.disconnect();
});
}).listen(3001);
Step 4 β Python: Shared Subscriber with asyncio and FastAPI Permalink to this section
# fanout.py β Python 3.12, redis-py 5.x, FastAPI 0.110+
import asyncio
import json
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import redis.asyncio as aioredis
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
REDIS_URL = "redis://localhost:6379"
redis_client: aioredis.Redis
# channel -> set of asyncio.Queue
subscribers: dict[str, set[asyncio.Queue]] = {}
sub_lock = asyncio.Lock()
async def redis_listener():
"""Single coroutine that subscribes to all channels and dispatches."""
pubsub = redis_client.pubsub()
async for message in pubsub.listen():
if message["type"] != "message":
continue
channel = message["channel"]
raw = message["data"]
queues = subscribers.get(channel, set())
for q in list(queues):
try:
q.put_nowait(raw)
except asyncio.QueueFull:
pass # slow consumer β drop or use backpressure
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client
redis_client = aioredis.from_url(REDIS_URL)
asyncio.create_task(redis_listener())
yield
await redis_client.aclose()
app = FastAPI(lifespan=lifespan)
async def event_generator(
channel: str, request: Request
) -> AsyncGenerator[str, None]:
q: asyncio.Queue = asyncio.Queue(maxsize=256)
async with sub_lock:
if channel not in subscribers:
subscribers[channel] = set()
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
subscribers[channel].add(q)
try:
while not await request.is_disconnected():
try:
raw = await asyncio.wait_for(q.get(), timeout=25.0)
envelope = json.loads(raw)
data_str = (
json.dumps(envelope["data"])
if isinstance(envelope.get("data"), dict)
else str(envelope.get("data", ""))
)
frame = ""
if envelope.get("id"):
frame += f'id: {envelope["id"]}\n'
if envelope.get("event"):
frame += f'event: {envelope["event"]}\n'
for line in data_str.split("\n"):
frame += f"data: {line}\n"
frame += "\n"
yield frame
except asyncio.TimeoutError:
yield ": heartbeat\n\n" # keep proxy connections alive
finally:
async with sub_lock:
subscribers[channel].discard(q)
if not subscribers[channel]:
del subscribers[channel]
@app.get("/events")
async def sse_endpoint(channel: str, request: Request):
return StreamingResponse(
event_generator(channel, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
The asyncio.QueueFull path is where you implement backpressure handling β either drop the event, close the connection, or buffer to disk.
Step 5 β Publish from Any Service Permalink to this section
Any process β a worker, a webhook handler, a cron job β can publish to the channel. The SSE servers pick it up automatically.
# publisher.py
import asyncio
import json
import redis.asyncio as aioredis
async def publish_event(channel: str, event: str, data: dict, event_id: str):
r = aioredis.from_url("redis://localhost:6379")
envelope = json.dumps({"event": event, "data": data, "id": event_id})
await r.publish(channel, envelope)
await r.aclose()
# Example usage
asyncio.run(publish_event(
channel="price-updates",
event="price-update",
data={"ticker": "AAPL", "price": 182.45},
event_id="01HZ9K4M2P8RX3Q5V7W0Y6B1C3",
))
// publisher.mjs (Node.js)
import { createClient } from 'redis';
const pub = createClient({ url: process.env.REDIS_URL });
await pub.connect();
await pub.publish('price-updates', JSON.stringify({
event: 'price-update',
data: { ticker: 'AAPL', price: 182.45 },
id: '01HZ9K4M2P8RX3Q5V7W0Y6B1C3',
}));
await pub.disconnect();
Validation and Monitoring Permalink to this section
curl Smoke Test Permalink to this section
# Terminal 1 β subscribe
curl -N -H "Accept: text/event-stream" \
"http://localhost:3000/?channel=price-updates"
# Terminal 2 β publish
redis-cli PUBLISH price-updates \
'{"event":"price-update","data":{"ticker":"AAPL","price":182.45},"id":"abc123"}'
You should see in Terminal 1 within milliseconds:
id: abc123
event: price-update
data: {"ticker":"AAPL","price":182.45}
Redis Diagnostics Permalink to this section
# Confirm subscription is active on the Redis server
redis-cli CLIENT LIST | grep -i sub
# Monitor all PUBLISH commands in real time (low-traffic only)
redis-cli MONITOR | grep PUBLISH
# Count active pub/sub clients
redis-cli INFO clients | grep connected_clients
redis-cli PUBSUB NUMSUB price-updates # subscriber count per channel
Node.js Unit Test Stub Permalink to this section
// fanout.test.mjs (Node.js built-in test runner)
import { test } from 'node:test';
import assert from 'node:assert/strict';
test('fanOut writes correct SSE frame', () => {
const frames = [];
const fakeRes = {
writableEnded: false,
write: (chunk) => frames.push(chunk),
};
// Inline the frame builder from fanout.js for isolated unit test
const envelope = { event: 'ping', data: { ts: 1 }, id: 'x1' };
const dataStr = JSON.stringify(envelope.data);
let frame = `id: ${envelope.id}\nevent: ${envelope.event}\n`;
for (const line of dataStr.split('\n')) frame += `data: ${line}\n`;
frame += '\n';
if (!fakeRes.writableEnded) fakeRes.write(frame);
assert.equal(frames[0], 'id: x1\nevent: ping\ndata: {"ts":1}\n\n');
});
Browser DevTools Check Permalink to this section
Open Network β Filter: Fetch/XHR β select the SSE request β EventStream tab. Each published message should appear as a separate row with the correct type, data, and id.
Monitor connection count and channel subscriber counts via PUBSUB NUMSUB from your metrics scraper. Alert if subscriber count drops to 0 while clients are connected β that indicates a silent UNSUBSCRIBE caused by a Redis timeout.
β‘ Production Directives
- Set
X-Accel-Buffering: no(Nginx) orX-Accel-Buffering: offon your SSE responses β a missing header causes clients to receive events only after the connection closes. - Use the shared-subscriber pattern above 5 k connections; per-connection subscribers will exhaust Redis
maxclientsand causeERR max number of clients reachederrors. - Send a comment heartbeat (
: heartbeat\n\n) every 20β30 s to prevent load balancers and mobile network stacks from silently closing idle connections. - Monitor
PUBSUB NUMSUB <channel>in your metrics pipeline β a zero count while clients are online signals a dropped subscription that no client disconnect triggered. - Always call
UNSUBSCRIBEin the connection close / disconnect handler; leaked subscriptions accumulate until Redis restarts and consume memory at ~500 bytes each.
Verification Checklist Permalink to this section
Frequently Asked Questions Permalink to this section
What happens to clients if Redis restarts?
With the shared-subscriber pattern the server's subscriber connection drops, raising a connection error. If you use ioredis's built-in auto-reconnect or redis-py's retry-on-error, the connection re-establishes within seconds. During that window no events reach clients. Clients will reconnect via the EventSource retry mechanism and send Last-Event-ID, but events published during the outage are lost β Redis pub/sub has no persistence. Combine with a Redis Stream (XADD/XREAD) if you need replay on reconnect.
Should I use Redis Streams (XADD/XREAD) instead of pub/sub?
Yes, if you need durable delivery or replay for clients that reconnect after a gap. Pub/sub is fire-and-forget: messages delivered to zero subscribers are lost. Streams persist messages and let you replay from a given ID, which pairs naturally with SSE's Last-Event-ID header. Use pub/sub when events are ephemeral (live scores, cursor positions) and Streams when clients must not miss messages (order status, audit logs).
How do I authenticate which channels a client may subscribe to?
Validate the channel name in your HTTP handler before calling subscribe(). Extract the authenticated user identity from a JWT in the Authorization header or a signed cookie (see Authenticating SSE Streams with Tokens & Cookies), then assert that userId matches the channel pattern (e.g., user:{userId}:events). Never let the client supply an arbitrary channel name without server-side validation.
How many channels can one Redis instance handle?
Redis has no hard limit on channel count, but each active subscription adds a small amount of memory (~100 bytes per pattern-subscriber pair). The binding constraint is usually maxclients (default 10 000) for per-connection patterns, or the number of subscriber connections for shared patterns. A single Redis node can handle tens of thousands of channels with shared subscribers; for millions of channels or extreme throughput, use Redis Cluster with channel-key sharding.