Managing memory buffers in Go streaming servers

Incident Triage & Symptoms

Immediate indicators of unbounded heap growth in persistent HTTP streams:

The objective is to enforce deterministic allocation patterns for persistent streams, eliminating heap fragmentation and preventing memory retention after client teardown.

Root Cause Analysis

Default net/http response writers allocate 4KB chunks that compound during backpressure. Unflushed bufio.Writer instances retain serialized event payloads in memory until explicit connection closure. Concurrent publisher goroutines holding references to disconnected clients cause silent memory leaks. Improper chunk sizing bypasses kernel send buffers (SO_SNDBUF), forcing the runtime to fragment the heap for large allocations. Aligning buffer pools with TCP window constraints requires understanding Buffer Management & Chunked Transfer Encoding before implementing custom allocation strategies.

Step-by-Step Resolution

1. Fixed-Size Buffer Pooling (sync.Pool)

Replace dynamic []byte allocations with a pre-warmed pool. This eliminates GC pressure from repeated 4KB–8KB allocations.

var bufPool = sync.Pool{
 New: func() interface{} {
 return make([]byte, 0, 8192)
 },
}

func acquireBuffer() []byte {
 return bufPool.Get().([]byte)[:0]
}

func releaseBuffer(b []byte) {
 bufPool.Put(b[:cap(b)])
}

2. Enforce http.Flusher Compliance

Assert the interface immediately. Invoke Flush() after every data: line write to push bytes to the kernel send buffer and trigger chunked transfer boundaries.

func streamHandler(w http.ResponseWriter, r *http.Request) {
 flusher, ok := w.(http.Flusher)
 if !ok {
 http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
 return
 }

 w.Header().Set("Content-Type", "text/event-stream")
 w.Header().Set("Cache-Control", "no-cache")
 w.Header().Set("Connection", "keep-alive")
 
 // Event loop writes directly to w
 fmt.Fprintf(w, "data: %s\n\n", payload)
 flusher.Flush()
}

3. Bounded Channels with Non-Blocking Drops

Prevent goroutine retention during client disconnects by using a fixed-capacity channel. Drop events when the client cannot consume them.

const clientBufferSize = 64

type client struct {
 ch chan []byte
}

func (c *client) send(event []byte) {
 select {
 case c.ch <- event:
 default:
 // Backpressure: drop event to prevent heap growth
 releaseBuffer(event)
 }
}

4. Server Timeout Configuration

Force buffer release on stale or zombie connections. Configure http.Server with strict timeouts.

srv := &http.Server{
 Addr: ":8080",
 Handler: mux,
 ReadTimeout: 10 * time.Second,
 WriteTimeout: 30 * time.Second, // Forces buffer flush/release on slow clients
 IdleTimeout: 60 * time.Second, // Closes keep-alive connections after inactivity
}

5. Zero-Allocation Event Serialization

Wrap event formatting in a reusable bytes.Buffer with explicit Reset(). This avoids repeated []byte allocations inside the publisher loop.

var eventBuf bytes.Buffer

func formatEvent(buf *bytes.Buffer, id, eventType, data string) []byte {
 buf.Reset()
 buf.WriteString("id: ")
 buf.WriteString(id)
 buf.WriteString("\nevent: ")
 buf.WriteString(eventType)
 buf.WriteString("\ndata: ")
 buf.WriteString(data)
 buf.WriteString("\n\n")
 // Return a copy to avoid slice aliasing with the underlying buffer
 out := make([]byte, buf.Len())
 copy(out, buf.Bytes())
 return out
}

6. Lifecycle Hooks & Context Cancellation

Tie buffer cleanup directly to the request context. Guarantee deterministic teardown when the client disconnects or the server shuts down. Implement connection lifecycle hooks per Backend Stream Generation & Connection Management to guarantee buffer cleanup on context.Done().

ctx := r.Context()
go func() {
 <-ctx.Done()
 // Drain and release channel buffers
 for {
 select {
 case event := <-c.ch:
 releaseBuffer(event)
 default:
 close(c.ch)
 return
 }
 }
}()

Validation & Monitoring

Execute the following under sustained load (e.g., wrk -t12 -c5000 -d60s http://localhost:8080/stream):

  1. Heap Profiling: Capture allocation hotspots.
go tool pprof -http=:6060 http://localhost:8080/debug/pprof/heap

Verify flat RSS growth plateaus after initial connection burst. Confirm sync.Pool dominates allocations over runtime.mallocgc.

  1. GC Pause Targets: Assert <5% GC pause times via runtime.ReadMemStats().
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("GC Pause: %v", time.Duration(m.PauseTotalNs))
  1. Flush Latency Tracking: Expose Prometheus histograms for Flush() execution time.
var flushLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "sse_flush_duration_seconds",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 10),
}, []string{"status"},
)
  1. Goroutine Leak Detection: Monitor deltas during forced disconnects.
before := runtime.NumGoroutine()
// ... trigger 1000 client disconnects ...
after := runtime.NumGoroutine()
if after > before {
log.Fatalf("Goroutine leak detected: %d active", after-before)
}
  1. Chunk Boundary Verification: Inspect raw network traffic. Confirm Transfer-Encoding: chunked headers align with Flush() calls. Validate memory stability during SIGTERM and abrupt TCP RST injection.