Immediate indicators of unbounded heap growth in persistent HTTP streams:
runtime.NumGoroutine() deltas post-disconnectThe objective is to enforce deterministic allocation patterns for persistent streams, eliminating heap fragmentation and preventing memory retention after client teardown.
Default net/http response writers allocate 4KB chunks that compound during backpressure. Unflushed bufio.Writer instances retain serialized event payloads in memory until explicit connection closure. Concurrent publisher goroutines holding references to disconnected clients cause silent memory leaks. Improper chunk sizing bypasses kernel send buffers (SO_SNDBUF), forcing the runtime to fragment the heap for large allocations. Aligning buffer pools with TCP window constraints requires understanding Buffer Management & Chunked Transfer Encoding before implementing custom allocation strategies.
sync.Pool) Replace dynamic []byte allocations with a pre-warmed pool. This eliminates GC pressure from repeated 4KB–8KB allocations.
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, 8192)
},
}
func acquireBuffer() []byte {
return bufPool.Get().([]byte)[:0]
}
func releaseBuffer(b []byte) {
bufPool.Put(b[:cap(b)])
}
http.Flusher Compliance Assert the interface immediately. Invoke Flush() after every data: line write to push bytes to the kernel send buffer and trigger chunked transfer boundaries.
func streamHandler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Event loop writes directly to w
fmt.Fprintf(w, "data: %s\n\n", payload)
flusher.Flush()
}
Prevent goroutine retention during client disconnects by using a fixed-capacity channel. Drop events when the client cannot consume them.
const clientBufferSize = 64
type client struct {
ch chan []byte
}
func (c *client) send(event []byte) {
select {
case c.ch <- event:
default:
// Backpressure: drop event to prevent heap growth
releaseBuffer(event)
}
}
Force buffer release on stale or zombie connections. Configure http.Server with strict timeouts.
srv := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second, // Forces buffer flush/release on slow clients
IdleTimeout: 60 * time.Second, // Closes keep-alive connections after inactivity
}
Wrap event formatting in a reusable bytes.Buffer with explicit Reset(). This avoids repeated []byte allocations inside the publisher loop.
var eventBuf bytes.Buffer
func formatEvent(buf *bytes.Buffer, id, eventType, data string) []byte {
buf.Reset()
buf.WriteString("id: ")
buf.WriteString(id)
buf.WriteString("\nevent: ")
buf.WriteString(eventType)
buf.WriteString("\ndata: ")
buf.WriteString(data)
buf.WriteString("\n\n")
// Return a copy to avoid slice aliasing with the underlying buffer
out := make([]byte, buf.Len())
copy(out, buf.Bytes())
return out
}
Tie buffer cleanup directly to the request context. Guarantee deterministic teardown when the client disconnects or the server shuts down. Implement connection lifecycle hooks per Backend Stream Generation & Connection Management to guarantee buffer cleanup on context.Done().
ctx := r.Context()
go func() {
<-ctx.Done()
// Drain and release channel buffers
for {
select {
case event := <-c.ch:
releaseBuffer(event)
default:
close(c.ch)
return
}
}
}()
Execute the following under sustained load (e.g., wrk -t12 -c5000 -d60s http://localhost:8080/stream):
go tool pprof -http=:6060 http://localhost:8080/debug/pprof/heap
Verify flat RSS growth plateaus after initial connection burst. Confirm sync.Pool dominates allocations over runtime.mallocgc.
<5% GC pause times via runtime.ReadMemStats().var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("GC Pause: %v", time.Duration(m.PauseTotalNs))
Flush() execution time.var flushLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "sse_flush_duration_seconds",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 10),
}, []string{"status"},
)
before := runtime.NumGoroutine()
// ... trigger 1000 client disconnects ...
after := runtime.NumGoroutine()
if after > before {
log.Fatalf("Goroutine leak detected: %d active", after-before)
}
Transfer-Encoding: chunked headers align with Flush() calls. Validate memory stability during SIGTERM and abrupt TCP RST injection.