Clients receive duplicate payloads or miss critical updates following network interruptions or server restarts. The Last-Event-ID header fails to trigger accurate stream resumption because event identifiers are non-sequential, randomly generated, or reset on deployment. The objective is to implement a strictly increasing, collision-free ID sequence that aligns with the SSE specification for deterministic state recovery. Proper sequencing is foundational to Backend Stream Generation & Connection Management and prevents cascading state corruption across real-time consumers.
Monotonicity breaks when relying on UUIDs, floating-point timestamps, or non-atomic counters. These formats introduce collisions, out-of-order delivery, or sequence gaps that prevent the client from reconstructing the exact stream state. The SSE protocol depends on the id: field to track progression; without a deterministic, ascending sequence, reconnection logic defaults to full stream replay or silent data loss. Atomic allocation must be synchronized precisely with payload dispatch to maintain sequence integrity across process boundaries and deployment cycles.
Use a centralized counter for distributed nodes, or language-native atomics for single-process deployments.
Redis (Distributed)
# Initialize sequence
redis-cli SET sse_event_seq 0
# Atomic increment per event
redis-cli INCR sse_event_seq
Node.js (Single-Node)
const { AtomicLong } = require('atomic-long');
const eventCounter = new AtomicLong(0);
function getNextId() {
return eventCounter.incrementAndGet();
}
Strictly adhere to the id:\ndata:\n\n frame structure. Do not append whitespace, carriage returns, or newline characters inside the ID value.
function formatSSEFrame(id, payload) {
return `id: ${id}\ndata: ${JSON.stringify(payload)}\n\n`;
}
Prevent sequence reset by flushing the current counter to durable storage during graceful termination.
import fs from 'fs/promises';
import process from 'process';
process.on('SIGTERM', async () => {
const lastId = await redisClient.get('sse_event_seq');
await fs.writeFile('/var/run/sse_last_id.txt', lastId.toString());
await redisClient.quit();
process.exit(0);
});
Block the write loop by generating IDs asynchronously. Push payloads to an internal channel, then serialize and flush to the HTTP response stream.
const eventQueue = [];
const isFlushing = false;
async function enqueueEvent(payload) {
const id = await getNextId();
eventQueue.push(formatSSEFrame(id, payload));
flushQueue();
}
async function flushQueue() {
if (isFlushing || eventQueue.length === 0) return;
isFlushing = true;
while (eventQueue.length > 0) {
const frame = eventQueue.shift();
res.write(frame);
}
isFlushing = false;
}
Track unconfirmed IDs to handle client disconnects without breaking sequence continuity. Drop frames older than the client’s Last-Event-ID upon reconnection.
const ackState = new Map(); // Key: socket.remoteAddress, Value: lastAckedId
function handleReconnect(req, res) {
const lastId = parseInt(req.headers['last-event-id'] || '0', 10);
ackState.set(req.socket.remoteAddress, lastId);
// Resume stream from lastId + 1
}
Verify monotonicity by asserting current_id > last_seen_id on every client reconnect. Instrument Prometheus counters to track sequence drift and delivery anomalies.
Prometheus Metrics Configuration
- job_name: 'sse_stream_monitor'
scrape_interval: 15s
static_configs:
- targets: ['localhost:9090']
metrics_path: '/metrics'
Custom Metrics to Export
sse_id_gaps_total: Counter for detected sequence jumps (current_id - last_seen_id > 1).duplicate_delivery_rate: Gauge tracking frames sent with identical IDs within a 5s window.last_event_id_mismatch_total: Counter for Last-Event-ID mismatches during reconnection.DevTools Validation Steps
sse-stream request.EventStream response. Verify id: fields increment by exactly 1.Offline for 3s, then restore.Last-Event-ID: <last_received> in the GET request header.<last_received + 1> without duplicating payloads.Load Test Script (k6)
import http from 'k6/http';
import { check, sleep } from 'k6';
export default function () {
const res = http.get('http://localhost:3000/events', {
headers: { 'Last-Event-ID': '42' }
});
check(res, {
'status is 200': (r) => r.status === 200,
'resumes at 43': (r) => r.body.includes('id: 43\n'),
'no duplicate 42': (r) => !r.body.includes('id: 42\n')
});
sleep(1);
}
Review Idempotent Event ID Generation patterns to ensure retry logic does not emit duplicate frames. Execute load tests simulating forced disconnects and validate that the client resumes exactly at the next sequential ID without payload duplication or state corruption.