javascript

SSE в production: почему нативного EventSource недостаточно и что с этим делать

  • вторник, 9 июня 2026 г. в 00:00:16
https://habr.com/ru/articles/1044808/

Введение

Год назад я строил real-time слой для AI SaaS-платформы. Корпоративные клиенты, AI-чаты со стримингом ответов, несколько вкладок открытых одновременно — типичный сценарий для подобного продукта.

Выбор пал на SSE: в отличие от WebSocket, SSE работает поверх обычного HTTP, хорошо дружит с прокси и балансировщиками, и для однонаправленного стриминга от сервера к клиенту это достаточно.

Нативный EventSource подключил быстро. Потом начались проблемы.

Первая — авторизация. EventSource не умеет слать заголовки. Вообще. Единственный способ передать токен — query-параметр в URL, что для корпоративного продукта просто не вариант.

Вторая — типизация. Нативный API возвращает MessageEvent без какой-либо типизации полезной нагрузки. Весь парсинг и приведение типов — на совести разработчика.

Третья — реконнект. EventSource реконнектится, но без контроля: нет jitter, нет exponential backoff, нет ограничения попыток. Сервер упал — клиенты начинают переподключаться одновременно, создавая thundering herd.

Четвёртая — вкладки. Каждая вкладка открывает своё соединение. Три вкладки — три потока на сервер от одного пользователя. При корпоративных клиентах которые держат продукт открытым весь рабочий день это ощутимо.

Написал всё сам. Работало. Но со временем этот код стал самым хрупким местом в приложении — гонки, ручной leader election, reconnect-планировщик, счётчики попыток. Когда что-то шло не так — открывал пять файлов и час разбирался где потерялся event.

Когда искал готовую библиотеку которая решает всё это — не нашёл. Либо слишком простые обёртки над EventSource, либо привязка к конкретному фреймворку, либо отсутствие типизации.

Решил написать сам. Так появился sse-runtime.


Архитектура sse-runtime

Библиотека состоит из трёх npm-пакетов под @flamefrontend/* скоупом:

@flamefrontend/sse-runtime-core     — framework-agnostic ядро, zero runtime dependencies
@flamefrontend/sse-runtime-react    — React-адаптер: useSSE хук и SSEProvider
@flamefrontend/sse-runtime-devtools — DevTools-панель для инспекции соединений

Такое разделение намеренное. Core не зависит ни от чего — ни от React, ни от каких-либо сторонних библиотек. Весь runtime построен на браузерных нативных API: fetch, ReadableStream, AbortController, TextDecoder, BroadcastChannel, Web Locks. Это значит что core можно использовать в Vue, Svelte, Angular или вообще без фреймворка — React-пакет опциональный.

React-пакет — тонкий адаптер поверх core. Он добавляет lifecycle management: auto-connect при монтировании, auto-disconnect при анмаунте, а также подписки на изменения состояния которые вызывают перерисовку компонентов.

DevTools — отдельный drop-in компонент. Он не зависит от internals core — интегрируется через контекст который экспортирует React-пакет.

Внутренние модули core

Внутри core каждый модуль отвечает за одну задачу:

Модуль

Задача

create-local-sse-client

Основной connection loop, state machine, backoff

create-coordinated-sse-client

Leader election + BroadcastChannel fan-out

client-state

Реактивный контейнер статуса и ошибок

read-sse-stream

Чтение байтового потока с heartbeat timeout

parse-sse-chunk

Stateful парсер SSE-фреймов, включая частичные чанки

dispatch-sse-event

JSON-парсинг + вызов обработчиков

create-fetch-transport

fetch-обёртка с правильными SSE-заголовками

calculate-reconnect-delay

Jittered exponential backoff

refresh-auth

Вызов onUnauthorized, возврат флага retry

coordination-backend

Абстракция над BroadcastChannel + Web Locks

State machine соединения

Соединение проходит через следующие состояния:

idle — начальное состояние, соединение не открыто.
connecting — fetch отправлен, ждём ответа.
open — стрим читается, события диспатчатся.
reconnecting — соединение потеряно, запланирован reconnect с backoff.
error — исчерпаны попытки или получена фатальная ошибка.
closed — явный вызов disconnect().


Технические решения

Leader election через Web Locks + BroadcastChannel

Задача: из всех открытых вкладок одна должна держать SSE-соединение, остальные — получать события через неё.


Наивное решение — хранить флаг "я лидер" в localStorage с TTL и периодически его обновлять. Проблемы очевидны: гонка при одновременном старте нескольких вкладок, нужен heartbeat, нужна логика подхвата при падении лидера.

Web Locks решает это атомарно на уровне браузера:

const lock = await navigator.locks.request(
  `sse-leader-${key}`,
  { mode: 'exclusive' },
  async () => {
    // Открываем SSE-соединение и читаем стрим
    // Лок удерживается пока промис не разрешится
    await runAsLeader()
  }
)

Почему Web Locks, а не localStorage с TTL:

  • Атомарность — браузер гарантирует что exclusive лок держит только одна вкладка

  • Автоматическое освобождение — если вкладка закрылась или упала, лок освобождается автоматически

  • Без polling — не нужен heartbeat, не нужны таймеры проверки

  • Очередь — вкладки автоматически встают в очередь на лок. При закрытии лидера следующая подхватывает лок мгновенно

Для fan-out событий от лидера к follower-вкладкам используется BroadcastChannel:

// Лидер отправляет событие всем вкладкам
channel.postMessage({ type: 'event', name, payload })

// Follower получает и диспатчит локально
channel.onmessage = ({ data }) => {
  if (data.type === 'event') {
    dispatchEvent(data.name, data.payload)
  }
}

Через broadcast follower-вкладки получают и обновления статуса соединения, и ошибки.

Generation counters против race conditions

SSE-клиент — асинхронный по природе. Типичная race condition: пользователь быстро переключает чаты, каждый вызов connect() запускает новый fetch. Если первый запрос ответил позже второго — его callback попытается обновить состояние уже "чужого" соединения.

Решение — generation counter:

let generation = 0

function connect() {
  const currentGeneration = ++generation

  openStream().then(stream => {
    // Проверяем что мы всё ещё актуальны
    if (currentGeneration !== generation) return

    readStream(stream, (event) => {
      if (currentGeneration !== generation) return
      dispatch(event)
    })
  })
}

function disconnect() {
  generation++ // Инвалидируем все текущие async операции
  abortController.abort()
}

Каждый connect() инкрементирует счётчик и захватывает его текущее значение. Все асинхронные коллбэки — чтение стрима, задержки reconnect, вызов auth refresh — сравнивают свой generation с текущим перед тем как мутировать состояние. disconnect() инкрементирует счётчик, инвалидируя все текущие операции.

Generation counters защищают от stale callbacks — это то чего AbortController не покрывает. В библиотеке используются оба: AbortController прерывает fetch, generation counters защищают всю цепочку async операций после него.


Stale-stream watchdog

Браузер не всегда сообщает о смерти SSE-соединения. Несколько сценариев где это происходит:

  • Сон ноутбука — соединение зависает, браузер не эмитит error

  • Потеря Wi-Fi без разрыва TCP — соединение выглядит живым, события не приходят

  • Wake drift — после выхода из сна системные таймеры могут сдвинуться, setTimeout срабатывает позже ожидаемого

Watchdog решает это через heartbeat timeout:

function readStreamWithWatchdog(
  stream: ReadableStream<Uint8Array>,
  onEvent,
  timeout = 45_000
) {
  let watchdogTimer: ReturnType<typeof setTimeout>
  const reader = stream.getReader()
  const decoder = new TextDecoder()

  function resetWatchdog() {
    clearTimeout(watchdogTimer)
    watchdogTimer = setTimeout(() => {
      // reader.cancel() завершает pump() — без него read() будет висеть вечно
      reader.cancel()
      onStaleStream()
    }, timeout)
  }

  resetWatchdog()

  async function pump() {
    while (true) {
      const { done, value } = await reader.read()
      if (done) break

      resetWatchdog() // Сбрасываем при каждом чанке
      processChunk(decoder.decode(value, { stream: true }))
    }
  }

  return pump()
}

При каждом полученном чанке данных таймер сбрасывается. Если сервер молчит дольше timeout — считаем соединение зависшим и инициируем reconnect. Сервер может слать пустые keep-alive комментарии :\n\n, чтобы держать watchdog живым между реальными событиями.

Wake drift и потеря сети обрабатываются отдельно — через три браузерных события: visibilitychange — возврат из фонового режима, online — восстановление сети, focus — возврат фокуса на вкладку. При срабатывании любого из них проверяем сколько времени прошло с последнего события и при необходимости переподключаемся не дожидаясь таймаута.


React-интеграция

Проблема бесконечных реконнектов

Наивная реализация useSSE хука выглядит так:

function useSSE({ url, headers, onMessage }) {
  useEffect(() => {
    const client = createSSEClient({
      url,
      headers,
      events: {
        message: onMessage
      }
    })

    client.connect()

    return () => client.disconnect()
  }, [url, headers, onMessage]) // ← проблема здесь
}

Если headers или onMessage — это функции созданные при каждом рендере, зависимости меняются на каждый рендер. useEffect пересоздаёт клиент, клиент переподключается. Бесконечный цикл реконнектов.

Стандартный совет — обернуть в useCallback. Но это перекладывает ответственность на пользователя, и одна забытая зависимость ломает поведение.

Transport identity vs handler identity

Ключевое решение хука: разделить опции на два класса.

Transport identity — опции которые требуют нового соединения при изменении:

  • key — идентификатор стрима

  • url — endpoint

  • enabled — включён ли стрим

  • credentials — режим credentials для fetch

  • coordination.mode

Handler identity — опции которые можно обновить без переподключения:

  • функции-обработчики событий

  • headers — async функция получения токена

  • auth.onUnauthorized

  • опции reconnect

  • коллбэки diagnostics

Имена событий намеренно убраны из transport identity. В SSE сервер шлёт что шлёт — клиент просто игнорирует неизвестные типы. Добавление нового обработчика не требует переподключения.

Для handler identity используется useLatestRef:

function useLatestRef<T>(value: T): React.RefObject<T> {
  const ref = useRef(value)

  // Синхронное обновление в теле функции — до того как эффекты запустятся.
  // Если обновлять ref в useEffect, между рендером и эффектом есть момент
  // когда ref устарел и старый обработчик может получить событие.
  ref.current = value

  return ref
}

Внутри core все handler-опции читаются через ref при каждом вызове — не захватываются в замыкание при создании клиента. Это значит что новая функция onMessage от рендера подхватывается мгновенно без пересоздания соединения.

buildEventProxies создаёт прокси-обработчики для каждого типа события: при вызове каждый из них обращается к optionsRef.current.events[name] — всегда актуальная функция без захвата в замыкание при создании клиента.

function useSSE<Events>(options: SSEOptions<Events>) {
  // Transport identity — изменение требует нового соединения.
  // Используем примитивы — JSON.stringify на объектах ненадёжен из-за порядка ключей.
  const transportKey = useMemo(
    () => JSON.stringify([
      options.key,
      options.url,
      options.enabled,
      options.credentials
    ]),
    [
      options.key,
      options.url,
      options.enabled,
      options.credentials
    ]
  )

  // Handler identity — обновляется без переподключения через ref
  const optionsRef = useLatestRef(options)

  useEffect(() => {
    const client = createSSEClient({
      ...optionsRef.current,
      headers: () => optionsRef.current.headers?.(),
      events: buildEventProxies(optionsRef),
    })

    client.connect()

    return () => client.disconnect()
  }, [transportKey]) // ← только transport identity в зависимостях
}

Пользователь не думает о useCallback — хук сам разбирается что требует реконнекта, а что нет.


Результат внедрения

Когда sse-runtime был готов, я внедрил его в то самое AI SaaS-приложение с которого всё началось.

Статистика PR:

Файлов изменено:  23
Строк удалено:    3 643
Строк добавлено:  188
Итого:            −3 455
Соотношение:      1:19

Но цифры — следствие, а не цель. Вот что реально изменилось:

Удалено:

  • Самописный reconnect-планировщик с ручным backoff

  • Ручной leader election через localStorage + TTL + heartbeat

  • Самописный stale-stream детектор

  • DevTools-виджет для отладки соединений, примерно 1 000 строк

  • Весь код синхронизации статуса между вкладками

Добавлено:

const { status, error } = useSSE<ChatEvents>({
  key: ["chat", chatId],
  url: `/api/chats/${chatId}/stream`,
  headers: async () => ({
    Authorization: `Bearer ${await getToken()}`
  }),
  events: {
    message: (msg) => appendMessage(msg),
    done: (d) => markDone(d.chatId),
  },
  auth: {
    onUnauthorized: refreshToken,
    retryAfterRefresh: true
  },
  coordination: {
    mode: "single-tab"
  },
})

Прикладной код остался с одной задачей: обработать событие. Вся инфраструктурная сложность — в библиотеке за 229 unit-тестами.


Тестирование

229 тестов в 23 файлах покрывают каждый модуль изолированно:

Область

Что тестируется

SSE frame parsing

Частичные чанки, multi-field события, BOM, retry parsing

Stream reading

Штатное завершение стрима, heartbeat timeout, abort, transport errors

Event dispatch

JSON парсинг, ошибки в обработчиках, неизвестные события

Reconnect backoff

Формула задержки, server retry override, maxRetries, jitter bounds

Auth refresh

401 handling, retry flag, сценарии падения refresh

Coordination

Leader election, follower state sync, broadcast messages

Client state

Subscribe/unsubscribe, переходы статусов, error state

React hook

Mount/unmount lifecycle, стабильность опций, event subscriptions

React options

Свежесть обработчиков через useLatestRef, условия пересоздания клиента

Особенно важны тесты на reconnect backoff — формула base * 2^(attempt - 1) + jitter с учётом server retry: поля и maxRetries. Без тестов это место легко сломать "улучшением".


Что дальше

Ближайшие планы:

React Native поддержка. Сейчас core использует BroadcastChannel и Web Locks — браузерные API которых нет в React Native. Планирую сделать заменяемый бэкенд координации: в браузере — текущая реализация, в React Native — no-op или альтернатива через AsyncStorage. Это позволит использовать один пакет в обоих окружениях и убрать дублирование логики между двумя репозиториями.

SSE Server helpers. Утилиты для Node.js/Edge серверов — чтобы формат событий на сервере совпадал с тем что ожидает клиент.


Ссылки

Если работаете с SSE в продакшне и сталкивались с похожими проблемами — интересно услышать как решали.

Telegram: @Artem_Kaliganov