Streaming SSE Responses with FastAPI and sse-starlette Permalink to this section
Part of Python FastAPI SSE Implementation Guide.
FastAPI does not ship SSE support out of the box. Returning a StreamingResponse with text/event-stream content works up to a point, but you immediately run into three problems: Starlette’s default StreamingResponse does not set the mandatory Cache-Control: no-cache header, it provides no built-in disconnect detection, and it silently buffers output under certain ASGI configurations. The sse-starlette library (pip install sse-starlette) wraps Starlette’s response machinery with an EventSourceResponse class that solves all three issues in ~50 lines, and integrates directly with FastAPI’s async generator pattern.
This guide walks from a broken naive implementation to a production-ready endpoint with proper generator cleanup, client disconnect detection, and a correctly tuned uvicorn/gunicorn deployment.
Symptom & Developer Intent Permalink to this section
You wrote a FastAPI route like this:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/stream")
async def stream():
async def gen():
for i in range(100):
yield f"data: {i}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
The problems you see in practice:
- Nginx / proxy buffering: events arrive in batches or only after the connection closes. No
X-Accel-Buffering: noheader means the proxy holds chunks. - No
retry:field: the browserEventSourcefalls back to a 3-second retry interval rather than the one you want. - Generator leaks on disconnect: when the client closes the tab, your async generator keeps running — consuming memory, hitting databases, or publishing to Redis — until the process shuts down or a timeout triggers.
- Missing
Cache-Control: no-cache: required by the SSE protocol spec; some CDNs cache the response body without it. - Missing
Last-Event-IDhandling: resumed connections sendLast-Event-IDin the request header; naively ignoring it breaks the event ID & retry mechanism.
Root Cause Analysis Permalink to this section
Why StreamingResponse falls short Permalink to this section
Starlette’s StreamingResponse is a generic chunked HTTP response. It streams an async iterable faithfully, but it is unaware of the text/event-stream contract:
| Concern | StreamingResponse |
EventSourceResponse (sse-starlette) |
|---|---|---|
Content-Type |
Must set manually | text/event-stream; charset=utf-8 |
Cache-Control: no-cache |
Not set | Set automatically |
X-Accel-Buffering: no |
Not set | Set automatically |
Connection: keep-alive |
Not set | Set automatically |
| Disconnect detection | None | asyncio.CancelledError propagated to generator |
event: / id: / retry: framing |
Manual \n\n strings |
Accepts dict or ServerSentEvent objects |
ASGI disconnect propagation Permalink to this section
Under ASGI (the protocol FastAPI/Starlette speak), a client disconnect arrives as an http.disconnect message on the receive channel. EventSourceResponse runs a background asyncio.Task that waits on receive(). When http.disconnect fires, it cancels the generator task, which raises asyncio.CancelledError in the generator’s yield expression. If you write a try/finally block around your generator’s inner loop, finally runs on disconnect — the correct place for cleanup (closing DB cursors, unsubscribing from Redis, etc.).
Uvicorn buffering defaults Permalink to this section
Uvicorn 0.20+ streams chunks as they arrive, but --limit-concurrency and the default --backlog 2048 interact with OS-level socket buffers. Under gunicorn+uvicorn workers, the keepalive timeout (default 5 s) is too short for long-lived SSE connections; workers recycle the connection after 5 seconds of silence even if the client is still subscribed.
Step-by-Step Resolution Permalink to this section
Step 1 — Install sse-starlette Permalink to this section
pip install "sse-starlette>=1.8.0"
# or with extras for version pinning:
pip install "sse-starlette>=1.8.0,<2.0"
Confirm the installed version:
python -c "import sse_starlette; print(sse_starlette.__version__)"
Step 2 — Write the async generator Permalink to this section
Your generator yields either plain strings (interpreted as the data: field) or dict / ServerSentEvent objects. Using dicts keeps the code readable:
import asyncio
from typing import AsyncGenerator
async def event_generator(request) -> AsyncGenerator[dict, None]:
"""
Yield SSE-framed events until the client disconnects.
Always clean up resources in the finally block.
"""
counter = 0
try:
while True:
# Check disconnect before blocking work
if await request.is_disconnected():
break
yield {
"event": "update", # maps to event: update
"id": str(counter), # maps to id: <n>
"retry": 3000, # maps to retry: 3000 (ms)
"data": f"counter={counter}",
}
counter += 1
await asyncio.sleep(1) # replace with real async I/O
finally:
# Release DB connections, Redis subscriptions, etc.
pass
request.is_disconnected() polls the ASGI receive channel and returns True once the browser sends http.disconnect. Calling it before each yield prevents pushing data into a dead socket.
Step 3 — Return an EventSourceResponse Permalink to this section
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/events")
async def events(request: Request):
generator = event_generator(request)
return EventSourceResponse(generator)
EventSourceResponse accepts an async iterable and sets all required headers automatically. No extra media_type argument is needed.
Step 4 — Handle Last-Event-ID for resumable streams Permalink to this section
When a client reconnects after a drop, the browser sends the Last-Event-ID request header. Read it to replay missed events:
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
@app.get("/events")
async def events(request: Request):
last_id_raw = request.headers.get("Last-Event-ID", "0")
try:
last_id = int(last_id_raw)
except ValueError:
last_id = 0
async def resumable_generator():
counter = last_id + 1 # replay starts after the last received ID
try:
while True:
if await request.is_disconnected():
break
yield {
"event": "update",
"id": str(counter),
"retry": 5000,
"data": f"value={counter}",
}
counter += 1
await asyncio.sleep(0.5)
finally:
pass # cleanup
return EventSourceResponse(resumable_generator())
For true replay you would query a persistent store (Redis stream with XRANGE, Postgres, etc.) between last_id and the current head before entering the live-tail loop. See Broadcasting SSE Events with Redis Pub/Sub for a complete fan-out pattern.
Step 5 — Configure uvicorn for long-lived connections Permalink to this section
# Development
uvicorn main:app --host 0.0.0.0 --port 8000 --log-level info
# Production (single process, adjust workers for your hardware)
uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 75 # longer than any proxy idle timeout (nginx default 75 s)
--limit-concurrency 1000 # reject excess connections with 503 before OOM
With gunicorn managing uvicorn workers:
gunicorn main:app \
-k uvicorn.workers.UvicornWorker \
-w 4 \
--timeout 0 \ # disable gunicorn's worker timeout for SSE
--keepalive 75 \ # match uvicorn's keep-alive above
--bind 0.0.0.0:8000
--timeout 0 is the critical flag: gunicorn’s default 30-second worker timeout kills SSE connections that are idle (no events) longer than the timeout, even though the connection is healthy.
Step 6 — Tune nginx to stop buffering Permalink to this section
If nginx sits in front, add these directives to your location block:
location /events {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection ""; # enable HTTP/1.1 keep-alive
proxy_buffering off; # disable proxy buffering
proxy_cache off;
proxy_read_timeout 3600s; # hold SSE connections open for 1 h
add_header X-Accel-Buffering no; # belt-and-suspenders for nginx accel
}
Without proxy_buffering off, nginx accumulates chunks in its own buffer until the buffer fills or the upstream closes — SSE clients see nothing until then. See Buffer Management & Chunked Transfer Encoding for the full mechanics.
Validation & Monitoring Permalink to this section
Verify headers with curl Permalink to this section
curl -N -i http://localhost:8000/events
Expected response headers:
HTTP/1.1 200 OK
content-type: text/event-stream; charset=utf-8
cache-control: no-cache
connection: keep-alive
x-accel-buffering: no
transfer-encoding: chunked
Expected event stream output (one block per second):
event: update
id: 0
retry: 3000
data: counter=0
event: update
id: 1
retry: 3000
data: counter=1
Unit-test stub with httpx and pytest-asyncio Permalink to this section
import pytest
import httpx
from httpx_sse import connect_sse # pip install httpx-sse
from main import app
@pytest.mark.asyncio
async def test_sse_streams_events():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
async with connect_sse(client, "GET", "/events") as event_source:
events = []
async for event in event_source.aiter_sse():
events.append(event)
if len(events) >= 3:
break
assert len(events) == 3
assert events[0].event == "update"
assert events[0].id == "0"
Monitor open connections Permalink to this section
# Count SSE connections currently held by the process
ss -tnp | grep :8000 | grep ESTABLISHED | wc -l
For Prometheus metrics, instrument with prometheus-fastapi-instrumentator and create a gauge:
from prometheus_client import Gauge
sse_connections = Gauge("sse_active_connections", "Open SSE connections")
async def event_generator(request):
sse_connections.inc()
try:
# ... your loop ...
yield {"data": "hello"}
finally:
sse_connections.dec()
Verification Checklist Permalink to this section
Frequently Asked Questions Permalink to this section
Can I use a synchronous generator instead of async?
You can pass a synchronous generator to EventSourceResponse, but it runs in a thread pool, which blocks one thread per connection and limits concurrency. Use async generators (async def + yield) so all connections share the event loop's cooperative scheduler — essential once you have hundreds of concurrent SSE clients.
How do I send a comment (heartbeat) to keep the connection alive through idle periods?
Yield a dict with only a "comment" key: yield {"comment": "keepalive"}. sse-starlette serialises it as : keepalive\n\n, which the browser ignores but which prevents the connection from timing out at the TCP or proxy layer. A 15–30 second interval is typical; see HTTP Keep-Alive & Connection Lifecycle for tuning guidance.
Does EventSourceResponse work with FastAPI's dependency injection?
Yes. Inject dependencies normally into the route function and pass them into the generator via closure. For example, inject a database session with Depends(get_db) and pass db into the generator. Put db.close() inside the generator's finally block so the connection is released when the client disconnects, not after a request/response cycle.
Will this work behind Cloudflare or AWS ALB?
Cloudflare in HTTP/2 mode buffers SSE under its default Enterprise plan settings; set response_buffering: off in a Page Rule or Ruleset. AWS ALB has a 60-second idle timeout by default — raise it to at least 3600 s for SSE endpoints, or send heartbeat comments every 30 s to keep idle connections alive. Both proxies require your backend to emit chunked transfer encoding, which EventSourceResponse sets up automatically via ASGI.
⚡ Production Directives
- Set gunicorn
--timeout 0and--keepalive 75; the default 30 s worker timeout silently kills live SSE connections. - Always wrap the generator loop in
try/finallyand release DB connections, Redis subscriptions, and file handles infinally. - Call
await request.is_disconnected()before each blocking operation inside the generator — do not rely solely onCancelledErrorcatching. - Set
proxy_buffering offandproxy_read_timeout 3600sin nginx; without these, clients receive events in bursts or never. - Export a Prometheus gauge for active SSE connections; a slow memory leak from un-cleaned generators shows up as a monotonically rising gauge rather than an OOM.