Syncing SSE Streams with Redux State Permalink to this section

Part of State-Management Integration for SSE.

You have an SSE endpoint pushing order updates, chat messages, or sensor readings. You open EventSource, receive events in a useEffect, and call dispatch() inline β€” then watch the store fill with duplicates on reconnect, stale cursor state after a network blip, or lost events because the reducer ran before the normalized schema was ready. This guide shows you the exact middleware pattern that eliminates those problems: a Redux middleware manages the EventSource lifecycle, dispatches typed actions on every message, and a normalized entity slice deduplicates by event.lastEventId.

Symptom & Developer Intent Permalink to this section

The symptoms arrive in three recognizable clusters:

  1. Duplicate entities in the store β€” the client reconnects after a 30-second network gap. The server replays events since Last-Event-ID, but the reducer push()es them unconditionally, so the list now contains two copies of the same record with the same ID.
  2. Race between connection setup and dispatch β€” component mounts, dispatches OPEN_STREAM, then a rapid state change unmounts it before the first message arrives. The orphaned EventSource keeps firing; the next mount creates a second one. The store now receives interleaved events from two streams.
  3. Stale retry cursor β€” the server sends retry: 5000\n and an id: field. The client ignores both, manually delays with setTimeout(3000), and misses the resumption sequence, causing the server to re-send from the wrong offset.

The developer intent is to have Redux own the stream β€” opening it, tearing it down, tracking connection state, and merging incoming events idempotently into an entity-normalized slice.

Root Cause Analysis Permalink to this section

Why inline useEffect dispatch fails Permalink to this section

EventSource is a long-lived browser object that does not fit React’s effect model cleanly. An effect runs after every render whose deps change; if dispatch is stable (it is, because Redux dispatch never changes reference) but url or authToken changes, the old EventSource is closed, a new one opened, and the Last-Event-ID is the browser’s last tracked value β€” which is only correct if the server sends id: fields on every event. See Event ID & Retry Mechanism Design for why the browser only tracks the last seen ID, not a cursor range.

Why the server replays and Redux deduplicates poorly Permalink to this section

The server rewinds to the Last-Event-ID the browser sends in the reconnect request. If your store reducer appends naively, you get duplicates. The fix is idempotent event ID generation on the server paired with upsertMany / upsertOne in a Redux Toolkit entity adapter on the client.

Why middleware beats component code Permalink to this section

Redux middleware runs outside React’s render cycle. It can hold the EventSource reference across component unmounts, keep a stable dispatch reference, and enforce a single-instance invariant (no duplicate connections). Components become pure consumers of derived selectors β€” they never touch the stream directly.

Approach Single instance Survives unmount Dedup support Reconnect control
useEffect inline No No Manual Manual
Custom hook + ref Partial No Manual Manual
Redux middleware Yes Yes Built-in via adapter Full
RTK Query (streaming) Yes Yes Partial (tags) Limited

Step-by-Step Resolution Permalink to this section

Step 1 β€” Install Redux Toolkit and set up the store Permalink to this section

npm install @reduxjs/toolkit react-redux
// src/store/index.ts
import { configureStore } from '@reduxjs/toolkit';
import { sseMiddleware } from './sseMiddleware';
import { ordersReducer } from './ordersSlice';

export const store = configureStore({
  reducer: {
    orders: ordersReducer,
  },
  middleware: (getDefaultMiddleware) =>
    getDefaultMiddleware().concat(sseMiddleware),
});

export type RootState = ReturnType<typeof store.getState>;
export type AppDispatch = typeof store.dispatch;

Step 2 β€” Define the entity slice with createEntityAdapter Permalink to this section

createEntityAdapter gives you upsertOne / upsertMany which are inherently idempotent: inserting an entity whose id already exists updates it in place rather than appending a duplicate.

// src/store/ordersSlice.ts
import { createSlice, createEntityAdapter, PayloadAction } from '@reduxjs/toolkit';

export interface Order {
  id: string;          // matches SSE event id field
  status: 'pending' | 'processing' | 'shipped' | 'delivered';
  updatedAt: number;   // epoch ms from server payload
  payload: unknown;
}

const adapter = createEntityAdapter<Order>({
  selectId: (o) => o.id,
  sortComparer: (a, b) => a.updatedAt - b.updatedAt,
});

export const ordersSlice = createSlice({
  name: 'orders',
  initialState: adapter.getInitialState({
    connectionStatus: 'closed' as 'closed' | 'connecting' | 'open' | 'error',
    lastEventId: null as string | null,
  }),
  reducers: {
    // Dispatched by middleware on EventSource.onopen
    streamOpened(state) {
      state.connectionStatus = 'open';
    },
    // Dispatched by middleware on EventSource.onerror
    streamError(state) {
      state.connectionStatus = 'error';
    },
    // Dispatched by middleware when EventSource is created
    streamConnecting(state) {
      state.connectionStatus = 'connecting';
    },
    // Dispatched by middleware when EventSource.close() is called
    streamClosed(state) {
      state.connectionStatus = 'closed';
    },
    // Dispatched by middleware on each 'message' or named event
    eventReceived(state, action: PayloadAction<{ eventId: string; data: Order }>) {
      const { eventId, data } = action.payload;
      state.lastEventId = eventId;
      adapter.upsertOne(state, { ...data, id: eventId });
    },
    // Bulk replay on reconnect β€” also idempotent
    eventsReplayed(state, action: PayloadAction<Order[]>) {
      adapter.upsertMany(state, action.payload);
    },
  },
});

export const {
  streamOpened, streamError, streamConnecting, streamClosed,
  eventReceived, eventsReplayed,
} = ordersSlice.actions;

export const ordersReducer = ordersSlice.reducer;
export const ordersSelectors = adapter.getSelectors((s: { orders: ReturnType<typeof adapter.getInitialState> }) => s.orders);

Step 3 β€” Write the SSE middleware Permalink to this section

The middleware intercepts two action types: sse/connect and sse/disconnect. It holds the EventSource in module scope so it survives component unmounts.

// src/store/sseMiddleware.ts
import { Middleware } from '@reduxjs/toolkit';
import {
  streamOpened, streamError, streamConnecting,
  streamClosed, eventReceived,
} from './ordersSlice';

// Action type constants β€” dispatch these from components
export const SSE_CONNECT    = 'sse/connect';
export const SSE_DISCONNECT = 'sse/disconnect';

let es: EventSource | null = null;

export const sseMiddleware: Middleware = (storeAPI) => (next) => (action) => {
  if (action.type === SSE_CONNECT) {
    // Enforce single-instance invariant
    if (es) {
      es.close();
      es = null;
    }

    const { url } = action.payload as { url: string };
    storeAPI.dispatch(streamConnecting());

    es = new EventSource(url, { withCredentials: true });

    es.onopen = () => {
      storeAPI.dispatch(streamOpened());
    };

    es.onerror = () => {
      storeAPI.dispatch(streamError());
      // Browser handles reconnect automatically via retry interval.
      // Do not close here unless you want to suppress auto-reconnect.
    };

    // Listen to named 'order-update' events; fall back to generic 'message'
    const handleEvent = (e: MessageEvent) => {
      try {
        const data = JSON.parse(e.data) as import('./ordersSlice').Order;
        const eventId = e.lastEventId || data.id;
        storeAPI.dispatch(eventReceived({ eventId, data }));
      } catch {
        // Malformed JSON β€” log and skip; do not crash the stream
        console.warn('[sse] failed to parse event data', e.data);
      }
    };

    es.addEventListener('order-update', handleEvent);
    es.onmessage = handleEvent; // generic fallback

    return next(action);
  }

  if (action.type === SSE_DISCONNECT) {
    if (es) {
      es.close();
      es = null;
    }
    storeAPI.dispatch(streamClosed());
    return next(action);
  }

  return next(action);
};

Step 4 β€” Connect and disconnect from components Permalink to this section

Components dispatch plain action objects β€” they never touch EventSource directly.

// src/components/OrderDashboard.tsx
import React, { useEffect } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { SSE_CONNECT, SSE_DISCONNECT } from '../store/sseMiddleware';
import { ordersSelectors } from '../store/ordersSlice';
import type { RootState } from '../store';

export function OrderDashboard() {
  const dispatch = useDispatch();
  const orders   = useSelector(ordersSelectors.selectAll);
  const status   = useSelector((s: RootState) => s.orders.connectionStatus);

  useEffect(() => {
    dispatch({ type: SSE_CONNECT, payload: { url: '/api/orders/stream' } });
    return () => {
      dispatch({ type: SSE_DISCONNECT });
    };
  }, []); // empty deps: open once, close on unmount

  return (
    <div>
      <p>Stream: {status}</p>
      <ul>
        {orders.map((o) => (
          <li key={o.id}>{o.id} β€” {o.status}</li>
        ))}
      </ul>
    </div>
  );
}

Step 5 β€” Handle Last-Event-ID on reconnect Permalink to this section

The browser automatically sends the Last-Event-ID header on every reconnect attempt (as long as the server sent id: fields). Your server must honour it. On the client, upsertOne in the reducer ensures replayed events are merged, not appended.

For streams where the server cannot resume from a cursor (e.g., live tickers), send a synthetic id: 0 on the first event and include retry: 5000\n so the browser uses 5 seconds instead of the default 3. See Setting the retry Interval in SSE Streams for server-side configuration.

// src/store/sseMiddleware.ts (addition: pass lastEventId on reconnect)
// Before creating the EventSource, read the last known ID from store state:
const lastEventId = storeAPI.getState().orders.lastEventId;
const urlWithCursor = lastEventId
  ? `${url}?lastEventId=${encodeURIComponent(lastEventId)}`
  : url;

es = new EventSource(urlWithCursor, { withCredentials: true });
// The browser also sends Last-Event-ID header automatically if it tracked one.
// The query-param approach handles the very first reconnect after a page reload.

Validation & Monitoring Permalink to this section

Verify deduplication with Redux DevTools Permalink to this section

  1. Open Chrome DevTools β†’ Redux DevTools extension.
  2. Trigger a manual disconnect (dispatch({ type: 'sse/disconnect' })).
  3. Reconnect β€” the server should replay recent events with the same id: values.
  4. In the DevTools diff view, check that orders.ids length does not increase for replayed IDs.

Unit-test the reducer’s idempotency Permalink to this section

// src/store/ordersSlice.test.ts
import { ordersReducer, eventReceived } from './ordersSlice';

test('upsertOne does not duplicate on replay', () => {
  const order = { id: 'evt-42', status: 'pending' as const, updatedAt: 1000, payload: {} };
  const action = eventReceived({ eventId: 'evt-42', data: order });

  const stateAfterFirst  = ordersReducer(undefined, action);
  const stateAfterSecond = ordersReducer(stateAfterFirst, action); // replay

  expect(stateAfterSecond.ids).toHaveLength(1); // no duplicate
  expect(stateAfterSecond.entities['evt-42']?.status).toBe('pending');
});

Verify stream lifecycle with curl Permalink to this section

# Confirm the server sends id: fields on every event
curl -N -H "Accept: text/event-stream" http://localhost:3000/api/orders/stream | head -30
# Expected output per event:
# id: evt-100
# event: order-update
# data: {"id":"ord-55","status":"processing","updatedAt":1718000000000}
#

If id: is absent, e.lastEventId in the middleware will be an empty string β€” fall back to data.id as shown in Step 3.

Monitor connection-status selector in Cypress Permalink to this section

// cypress/e2e/sse-redux.cy.ts
it('shows "open" after stream connects', () => {
  cy.visit('/orders');
  cy.contains('Stream: open', { timeout: 5000 });
});

it('deduplicates on reconnect', () => {
  cy.visit('/orders');
  cy.contains('Stream: open', { timeout: 5000 });
  // Count rows before
  cy.get('li').its('length').as('before');
  // Force a reconnect by intercepting and dropping the connection
  cy.intercept('GET', '/api/orders/stream').as('sse');
  cy.wait('@sse');
  // After reconnect, row count should remain the same
  cy.get('@before').then((before) => {
    cy.get('li').should('have.length', before as unknown as number);
  });
});

Production Checklist Permalink to this section

⚑ Production Directives

  • Use upsertOne/upsertMany from createEntityAdapter β€” never addOne or array push β€” to guarantee idempotency across reconnects.
  • Keep the EventSource reference in middleware scope, not component state; this survives React unmount/remount cycles without spawning duplicate connections.
  • Always catch JSON parse errors in the middleware event handler and log them without re-throwing β€” a malformed payload must not break the stream.
  • Persist lastEventId to localStorage and attach it as a query param on the initial connect action so cross-tab and post-reload reconnects resume from the correct cursor.
  • Expose connectionStatus as a selector to your monitoring dashboard; alert when the status stays in 'error' for more than 15 seconds.

Frequently Asked Questions Permalink to this section

Can I use RTK Query instead of custom middleware?

RTK Query supports streaming updates via onCacheEntryAdded and updateCachedData. It works well for simple cases but gives you less control over the EventSource lifecycle, connection status tracking, and cross-component singleton enforcement. The custom middleware approach described here is more explicit and easier to debug with Redux DevTools.

What happens if two components both dispatch sse/connect?

The middleware closes the existing EventSource before opening a new one (see the single-instance invariant in Step 3). As long as both dispatches target the same URL, the second dispatch is a no-op net effect. Coordinate with an initialization flag in the store β€” dispatch sse/connect only from a top-level layout component or an app-wide initializer, not from individual list items.

How do I scope the stream to a specific authenticated user?

Pass the URL with an auth token as a query parameter in the sse/connect payload, or set withCredentials: true to send the session cookie. The server validates the credential on each reconnect attempt. See Authenticating SSE Streams with Tokens & Cookies for the full pattern.

How do I handle multiple named event types from a single stream?

Add additional es.addEventListener('event-name', handler) calls in the middleware. Each handler dispatches a different Redux action, which can target a different slice. The entity adapter approach composes cleanly: each slice has its own adapter and upsertOne reducer.

Why does e.lastEventId return an empty string sometimes?

The lastEventId property on MessageEvent is empty when the server does not send an id: field, or sends id: with no value (which resets the tracked ID to the empty string). Always fall back to a stable unique field in the event's JSON payload β€” for example, a database row ID or a UUIDv7 generated server-side. See Idempotent Event ID Generation for server-side strategies.