Implementing SSE with Go Channels and http.Flusher Permalink to this section

Part of Go Streaming Patterns for SSE.

Go’s net/http package does not automatically flush partial responses — it buffers writes until the handler returns or until an explicit flush is called. When you write text/event-stream events and return from the handler immediately after, clients see nothing until the connection closes. The fix is a select loop that reads from a per-client channel and calls http.Flusher.Flush() after each write. This guide walks through the exact implementation: correct headers, a channel-per-connection broker, and a context-aware select loop that tears down cleanly when the client disconnects.

Symptom and Developer Intent Permalink to this section

You have a Go HTTP handler that produces SSE events. In local testing with curl --no-buffer, events arrive only when the handler exits, not incrementally. In the browser, EventSource fires onopen but no message events appear until the page reloads (at which point the server has already closed the connection). The desired behavior: events dispatched by the server appear at the client within milliseconds, while the connection remains open indefinitely.

A secondary symptom: when 500 concurrent clients connect, goroutine counts grow without bound because disconnected clients are never removed from the broadcaster’s subscriber map — goroutines block forever on channel sends to dead readers.

Root Cause Analysis Permalink to this section

Why Go Buffers HTTP Writes Permalink to this section

http.ResponseWriter wraps a bufio.Writer with a default buffer of 4 KB. Writes accumulate until either the buffer fills or the handler returns, at which point net/http flushes and closes the response body. For a standard JSON endpoint this is optimal — a single TCP segment. For a streaming endpoint it is fatal: the client waits for a buffer that never fills because events are small.

The http.Flusher interface exposes the escape hatch. When the underlying ResponseWriter supports it (and the standard library’s always does for normal TCP connections), casting to http.Flusher and calling Flush() forces an immediate write to the kernel buffer. This is separate from the OS-level TCP flush — Flush() moves data from Go’s application buffer to the socket; TCP sends it on its next opportunity (usually within microseconds because Nagle’s algorithm is disabled on most modern stacks for keep-alive connections).

Why Per-Client Channels Are Necessary Permalink to this section

A single global channel serializes all clients: only one reader gets each event. A broadcast pattern requires a channel per connected client so the broker can fan events out by ranging over a subscriber map and sending to each channel independently. Channels also decouple the broker’s event-dispatch goroutine from HTTP handler goroutines, preventing a slow or dead client from blocking the broker’s send loop (use buffered channels with a fixed capacity and drop-or-disconnect on overflow).

Why ctx.Done() Is the Correct Disconnect Signal Permalink to this section

http.Request.Context() is cancelled by net/http the moment the client closes the TCP connection (HTTP/1.1 half-close, RST, or timeout). Polling CloseNotifier (deprecated since Go 1.11) or writing and checking for errors works but introduces latency. A select on ctx.Done() reacts within one scheduler tick — typically under 100 µs.

Step-by-Step Resolution Permalink to this section

Step 1 — Set the Required SSE Headers Permalink to this section

Before writing any data, set three mandatory headers and call Flush() once to send them immediately. For CORS-protected endpoints, add Access-Control-Allow-Origin here as well.

func sseHandler(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    // Optional: tell nginx / Caddy not to buffer this response
    w.Header().Set("X-Accel-Buffering", "no")
    w.WriteHeader(http.StatusOK)
    flusher.Flush() // send headers to client immediately

The X-Accel-Buffering: no header is recognised by nginx’s proxy_buffering directive and prevents the reverse proxy from accumulating the stream in its own buffer — a common reason events arrive in bursts rather than individually. See buffer management and chunked transfer encoding for the full proxy-buffering picture.

Step 2 — Build a Concurrent-Safe Broker Permalink to this section

The broker holds the subscriber map, registers/deregisters clients, and fans out events. Use a struct with channels to avoid shared-memory races.

package main

import (
    "fmt"
    "sync"
)

// Event is the unit dispatched to clients.
type Event struct {
    ID   string // optional; sent as "id: <ID>\n"
    Type string // optional; sent as "event: <Type>\n"
    Data string // required; sent as "data: <Data>\n"
}

// Broker manages active SSE clients.
type Broker struct {
    mu          sync.RWMutex
    subscribers map[chan Event]struct{}
}

func NewBroker() *Broker {
    return &Broker{subscribers: make(map[chan Event]struct{})}
}

// Subscribe creates a buffered channel for one client and registers it.
// Buffer of 64 prevents a slow client from blocking Broadcast.
func (b *Broker) Subscribe() chan Event {
    ch := make(chan Event, 64)
    b.mu.Lock()
    b.subscribers[ch] = struct{}{}
    b.mu.Unlock()
    return ch
}

// Unsubscribe removes the channel and closes it so the handler's range exits.
func (b *Broker) Unsubscribe(ch chan Event) {
    b.mu.Lock()
    delete(b.subscribers, ch)
    close(ch)
    b.mu.Unlock()
}

// Broadcast sends an event to all active subscribers.
// A full channel is drained of one item and the new event is dropped — 
// prefer this over blocking, which would stall the broadcaster.
func (b *Broker) Broadcast(e Event) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    for ch := range b.subscribers {
        select {
        case ch <- e:
        default:
            // subscriber too slow; skip rather than block
        }
    }
}

The choice of buffer size (64 here) controls how many undelivered events a slow client may accumulate before drops begin. Tune based on your event rate and acceptable memory per connection; at 512 bytes per event, 64 events costs ~32 KB per client.

Step 3 — Write the select Loop in the Handler Permalink to this section

Subscribe, defer cleanup, then block in a select that races incoming events against context cancellation.

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("X-Accel-Buffering", "no")
    w.WriteHeader(http.StatusOK)
    flusher.Flush()

    ch := b.Subscribe()
    defer b.Unsubscribe(ch) // always runs; removes dead client

    ctx := r.Context()
    for {
        select {
        case <-ctx.Done():
            // Client disconnected or server shut down.
            return
        case event, ok := <-ch:
            if !ok {
                // Channel was closed by Unsubscribe (e.g. during shutdown).
                return
            }
            writeEvent(w, event)
            flusher.Flush()
        }
    }
}

// writeEvent formats one SSE message onto the ResponseWriter.
func writeEvent(w http.ResponseWriter, e Event) {
    if e.ID != "" {
        fmt.Fprintf(w, "id: %s\n", e.ID)
    }
    if e.Type != "" {
        fmt.Fprintf(w, "event: %s\n", e.Type)
    }
    // Multi-line data: each \n becomes a new "data:" line per the SSE spec.
    fmt.Fprintf(w, "data: %s\n\n", e.Data)
}

Each \n\n after data: terminates the event per the text/event-stream wire format. Missing the blank line is the single most common wire-format bug — the client buffers indefinitely waiting for the event boundary.

Step 4 — Wire a Heartbeat to Keep the Connection Alive Permalink to this section

Proxies and load balancers kill idle connections after 30–90 seconds. Send a comment-only event (: keepalive\n\n) on a ticker so the TCP connection stays active. Comments are ignored by EventSource but reset the proxy’s idle timer.

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // ... headers, subscribe, defer as above ...

    ticker := time.NewTicker(25 * time.Second)
    defer ticker.Stop()

    ctx := r.Context()
    for {
        select {
        case <-ctx.Done():
            return
        case event, ok := <-ch:
            if !ok {
                return
            }
            writeEvent(w, event)
            flusher.Flush()
        case <-ticker.C:
            fmt.Fprintf(w, ": keepalive\n\n")
            flusher.Flush()
        }
    }
}

Step 5 — Register the Handler and Run the Server Permalink to this section

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

func main() {
    broker := NewBroker()

    // Simulate an upstream event source (replace with real business logic).
    go func() {
        id := 0
        for {
            time.Sleep(2 * time.Second)
            id++
            broker.Broadcast(Event{
                ID:   fmt.Sprintf("%d", id),
                Type: "update",
                Data: fmt.Sprintf(`{"seq":%d,"ts":"%s"}`, id, time.Now().UTC().Format(time.RFC3339)),
            })
        }
    }()

    mux := http.NewServeMux()
    mux.Handle("/events", broker)
    mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/html")
        fmt.Fprint(w, `<!doctype html><script>
var es = new EventSource('/events');
es.addEventListener('update', e => console.log(JSON.parse(e.data)));
</script>`)
    })

    srv := &http.Server{
        Addr:         ":8080",
        Handler:      mux,
        ReadTimeout:  5 * time.Second,
        // WriteTimeout must be 0 (no timeout) for long-lived SSE streams,
        // or set to a long value and reset per-write via hijack.
        WriteTimeout: 0,
        IdleTimeout:  120 * time.Second,
    }
    log.Fatal(srv.ListenAndServe())
}

WriteTimeout: 0 is intentional for SSE handlers. A finite WriteTimeout will kill connections after that duration. If you must enforce one, implement it per-stream using a context with deadline created inside the handler.

Validation and Monitoring Permalink to this section

Verify with curl Permalink to this section

# --no-buffer prevents curl from accumulating its own buffer.
curl -N -H "Accept: text/event-stream" http://localhost:8080/events

Expected output (every 2 seconds):

id: 1
event: update
data: {"seq":1,"ts":"2026-06-21T12:00:00Z"}

id: 2
event: update
data: {"seq":2,"ts":"2026-06-21T12:00:02Z"}

Check Active Connections and Goroutine Count Permalink to this section

# Goroutine count via pprof (add _ "net/http/pprof" import)
curl -s http://localhost:8080/debug/pprof/goroutine?debug=1 | head -20

# Verify connected subscriber count (expose via a /metrics endpoint)
# or use expvar:
import "expvar"

var activeClients = expvar.NewInt("sse_active_clients")

// In Subscribe():
activeClients.Add(1)

// In Unsubscribe():
activeClients.Add(-1)
curl http://localhost:8080/debug/vars | jq .sse_active_clients

Unit Test: Flush Is Called on Each Event Permalink to this section

package main

import (
    "net/http/httptest"
    "testing"
    "time"
)

// flushRecorder wraps httptest.ResponseRecorder and counts Flush calls.
type flushRecorder struct {
    *httptest.ResponseRecorder
    flushCount int
}

func (f *flushRecorder) Flush() { f.flushCount++ }

func TestFlushCalledPerEvent(t *testing.T) {
    broker := NewBroker()
    rec := &flushRecorder{ResponseRecorder: httptest.NewRecorder()}

    // Run handler in background; cancel after one event.
    req := httptest.NewRequest("GET", "/events", nil)
    ctx, cancel := context.WithCancel(req.Context())
    req = req.WithContext(ctx)

    done := make(chan struct{})
    go func() {
        broker.ServeHTTP(rec, req)
        close(done)
    }()

    time.Sleep(20 * time.Millisecond) // handler starts
    broker.Broadcast(Event{Data: "hello"})
    time.Sleep(20 * time.Millisecond) // event delivered
    cancel()
    <-done

    if rec.flushCount < 2 { // at least header flush + event flush
        t.Fatalf("expected >=2 flushes, got %d", rec.flushCount)
    }
}

For guidance on cleanly draining connections during restarts, see Graceful Shutdown for Go SSE Servers.

Production Checklist Permalink to this section

⚡ Production Directives

  • Always verify http.Flusher support at handler entry — middleware that wraps ResponseWriter (e.g. gzip, logging) may hide it; unwrap or skip compression for SSE routes.
  • Set WriteTimeout: 0 on the http.Server for SSE endpoints; a finite timeout silently kills long-lived streams.
  • Buffer each client channel (minimum 16 events) and drop on overflow rather than blocking — one lagging client must never stall the broadcaster goroutine.
  • Send an SSE comment (: keepalive) every 25 s; this resets proxy idle timers without triggering EventSource message handlers.
  • Track goroutine count and subscriber map size via expvar or Prometheus; a goroutine leak here means Unsubscribe is not being called on disconnect.

Frequently Asked Questions Permalink to this section

Why does my SSE handler work locally but events arrive in batches behind nginx?

nginx buffers proxied responses by default. Add proxy_buffering off; to the nginx location block, or set the X-Accel-Buffering: no response header from Go. The header approach is portable and does not require nginx config changes per deploy.

Can I use http.Flusher with HTTP/2?

Yes. Go's net/http HTTP/2 implementation wraps the stream writer in a type that also implements http.Flusher. The cast succeeds and Flush() sends a DATA frame immediately. HTTP/2 multiplexing means SSE connections share a single TCP connection per host, which improves browser connection-limit behaviour compared to HTTP/1.1 SSE.

What happens when the channel buffer fills up?

The select in Broadcast hits the default branch and skips that subscriber for that event. The client misses the event. If this happens frequently, increase the buffer, slow down the publisher, or implement client-side replay using Last-Event-ID. See Event ID and Retry Mechanism Design for replay patterns.

Is it safe to call broker.Broadcast from multiple goroutines?

Yes, as written. Broadcast acquires an RLock, which allows multiple concurrent broadcasts to proceed simultaneously while Subscribe/Unsubscribe take a full write lock. Sends to individual client channels are goroutine-safe because channels are safe for concurrent use.

How do I send the retry directive to control reconnect timing?

Emit fmt.Fprintf(w, "retry: 5000\n\n") as the first event after headers. The value is milliseconds. The browser's EventSource will wait 5 s before reconnecting after any disconnect. For production, 3000–10000 ms is typical. See Setting the retry Interval in SSE Streams for a full discussion.