javascript

Публикация событий из Laravel в Centrifugo: Events, listeners, queue jobs и безопасный real-time pa…

  • вторник, 19 мая 2026 г. в 00:00:05
https://habr.com/ru/articles/1036322/

Real-time в Laravel-проекте не должен начинаться с прямого HTTP-запроса к Centrifugo из контроллера. Это быстрый путь к связанному коду, нестабильной архитектуре и ошибкам синхронизации между frontend и backend. Правильная публикация событий из Laravel в Centrifugo строится иначе: Laravel меняет состояние системы, создаёт событие приложения, listener передаёт задачу в queue job, а job уже публикует минимальный payload в Centrifugo.

Такой подход особенно важен для проектов, где real-time обновления связаны с заказами, платежами, уведомлениями, чатами, административными панелями и любыми статусами, которые пользователь должен увидеть без перезагрузки страницы. Centrifugo отвечает за WebSocket-доставку сообщений, но Laravel остаётся источником истины: именно backend проверяет права, сохраняет данные и решает, какое событие действительно произошло.

В этой статье разберём, как правильно организовать публикацию real-time событий в Laravel: почему не стоит отправлять сообщения напрямую из контроллеров, как использовать Laravel Events и listeners, зачем нужны queue jobs, почему afterCommit() критичен для корректности данных и каким должен быть безопасный payload без полной Eloquent-модели.

Почему нельзя публиковать события в Centrifugo напрямую из контроллера

Контроллер в Laravel должен оставаться входной точкой HTTP-запроса. Его задача — принять request, передать управление прикладному слою и вернуть response. Когда в контроллер добавляют публикацию события в Centrifugo, он начинает отвечать сразу за несколько уровней: HTTP, бизнес-логику, работу с базой данных, формат real-time события, имя канала и сетевой вызов к WebSocket-серверу.

На небольшом примере это может выглядеть безобидно:

namespace App\Http\Controllers\Order;

use App\Models\Order;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Http;

class PayOrderController
{
    public function __invoke(Request $request, int $orderId): JsonResponse
    {
        $order = Order::query()->findOrFail($orderId);

        $order->update([
            'status' => 'paid',
        ]);

        Http::withHeaders(headers: [
            'Authorization' => 'apikey ' . config('services.centrifugo.api_key'),
        ])->post(
            url: config('services.centrifugo.api_url') . '/api/publish',
            data: [
                'channel' => '$users:' . $order->user_id,
                'data' => [
                    'type' => 'order.status.changed',
                    'orderId' => $order->id,
                    'status' => $order->status,
                ],
            ],
        )->throw();

        return response()->json(data: [
            'success' => true,
        ]);
    }
}

Проблема не в том, что код не работает. Он работает. До тех пор, пока Centrifugo доступен, сетевой вызов быстрый, формат события не меняется, контроллер не разрастается и никто не пытается покрыть это нормальными тестами. То есть, как обычно, всё хорошо ровно до production.

У такого подхода есть несколько архитектурных недостатков.

Во-первых, HTTP-запрос пользователя начинает зависеть от доступности Centrifugo. Если real-time сервер временно недоступен, основной бизнес-процесс может завершиться ошибкой, хотя заказ уже успешно изменён в базе данных.

Во-вторых, контроллер начинает знать слишком много: имя канала, структуру payload, способ авторизации в Centrifugo API и детали доставки события.

В-третьих, публикацию события сложно переиспользовать. Если тот же статус заказа изменится не из контроллера, а из webhook, command handler или фоновой задачи, логику придётся дублировать.

Правильная схема должна быть другой:

HTTP request
    ↓
Laravel меняет состояние
    ↓
Laravel создаёт событие приложения
    ↓
Listener реагирует на событие
    ↓
Queue job публикует сообщение в Centrifugo

Это базовая архитектура Laravel Centrifugo для устойчивых real-time обновлений.

Laravel Events: событие должно описывать факт, а не транспорт

Начинать нужно не с WebSocket и не с Centrifugo API. Начинать нужно с факта предметной области. Например: заказ изменил статус. Это событие приложения, а не транспортное сообщение.

Для этого можно создать событие OrderStatusChanged:

namespace App\Events;

class OrderStatusChanged
{
    public function __construct(
        public readonly int $orderId,
        public readonly int $userId,
        public readonly string $status,
    ) {
    }
}

Такой класс не должен знать о Centrifugo. В нём не должно быть имени канала, API endpoint, WebSocket-логики или структуры публикации. Он фиксирует только факт: статус заказа изменился.

Контроллер становится проще и чище:

namespace App\Http\Controllers\Order;

use App\Events\OrderStatusChanged;
use App\Models\Order;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;

class PayOrderController
{
    public function __invoke(Request $request, int $orderId): JsonResponse
    {
        DB::transaction(function () use ($orderId): void {
            $order = Order::query()->findOrFail($orderId);

            $order->update([
                'status' => 'paid',
            ]);

            event(new OrderStatusChanged(
                orderId: $order->id,
                userId: $order->user_id,
                status: 'paid',
            ));
        });

        return response()->json(data: [
            'success' => true,
        ]);
    }
}

В реальном Laravel-проекте изменение статуса лучше выносить из контроллера в action, service, use case или command handler. Но даже в упрощённом примере видно главное: контроллер больше не публикует real-time событие напрямую в Centrifugo. Он выполняет бизнес-действие и создаёт событие приложения.

Преимущество Laravel Events в том, что один факт может иметь несколько независимых реакций. Например, событие OrderStatusChanged может:

  • отправить real-time сообщение в Centrifugo;

  • записать audit log;

  • отправить email;

  • добавить запись в аналитику;

  • запустить другой фоновый процесс.

Само событие при этом остаётся стабильным и не зависит от конкретного транспорта.

Listeners: отделяем реакцию на событие от бизнес-логики

Listener нужен для реакции на событие. В нашем случае он должен инициировать публикацию real-time сообщения. Но даже listener лучше не превращать в место прямого HTTP-вызова к Centrifugo. Его задача — передать работу в очередь.

namespace App\Listeners;

use App\Events\OrderStatusChanged;
use App\Jobs\PublishOrderStatusChangedToRealtime;

class SendOrderStatusChangedToRealtime
{
    public function handle(OrderStatusChanged $event): void
    {
        PublishOrderStatusChangedToRealtime::dispatch(
            orderId: $event->orderId,
            userId: $event->userId,
            status: $event->status,
        )->afterCommit();
    }
}

Здесь важны две вещи.

Первая — listener не знает деталей Centrifugo API. Он только решает, что на событие OrderStatusChanged нужно отправить real-time публикацию.

Вторая — используется afterCommit(). Для real-time Laravel это критически важная деталь. Если событие отправить до фиксации транзакции, frontend может получить обновление раньше, чем новое состояние реально появится в базе данных.

Например, пользователь может увидеть статус paid, затем frontend сделает HTTP-запрос за актуальным состоянием заказа и получит старый статус pending. Это не «странный баг фронта». Это ошибка порядка событий на backend.

Правило простое: внешние эффекты должны происходить после commit транзакции. Публикация события в Centrifugo — это внешний эффект. Значит, отправлять её нужно только после того, как Laravel надёжно сохранил состояние.

Queue jobs: публикация в Centrifugo без блокировки HTTP-запроса

Публикация сообщения в Centrifugo — это сетевой вызов. Он может быть быстрым, а может быть медленным. Он может завершиться успешно, а может временно упасть. Поэтому его не стоит выполнять в основном HTTP-запросе.

Для этого используется queue job:

namespace App\Jobs;

use App\Services\Realtime\CentrifugoPublisher;
use Illuminate\Contracts\Queue\ShouldQueue;

class PublishOrderStatusChangedToRealtime implements ShouldQueue
{
    public function __construct(
        public readonly int $orderId,
        public readonly int $userId,
        public readonly string $status,
    ) {
    }

    public function handle(CentrifugoPublisher $publisher): void
    {
        $publisher->publish(
            channel: '$users:' . $this->userId,
            data: [
                'type' => 'order.status.changed',
                'orderId' => $this->orderId,
                'status' => $this->status,
            ],
        );
    }
}

Такой job делает одну задачу: публикует подготовленное сообщение в нужный канал Centrifugo. Он не меняет заказ, не проверяет webhook, не принимает бизнес-решения и не строит HTTP response. Это нормальное разделение ответственности.

Сервис публикации можно оформить отдельно:

namespace App\Services\Realtime;

use Illuminate\Support\Facades\Http;

class CentrifugoPublisher
{
    public function publish(
        string $channel,
        array $data,
    ): void {
        Http::withHeaders(headers: [
            'Authorization' => 'apikey ' . config('services.centrifugo.api_key'),
        ])->post(
            url: config('services.centrifugo.api_url') . '/api/publish',
            data: [
                'channel' => $channel,
                'data' => $data,
            ],
        )->throw();
    }
}

Один сервис для публикации в Centrifugo лучше, чем десятки прямых HTTP-вызовов по проекту. Его проще тестировать, логировать, расширять и заменять. Если завтра понадобится добавить retry, метрики, tracing, другой endpoint или официальный SDK, изменения останутся в одном месте.

Queue jobs дают несколько преимуществ:

  • основной HTTP-запрос не ждёт Centrifugo;

  • временная ошибка публикации не ломает бизнес-действие;

  • публикацию можно повторить;

  • ошибки можно логировать отдельно;

  • real-time транспорт становится изолированным слоем.

Для production-проекта это не усложнение, а минимальная инженерная гигиена.

Почему afterCommit() важен для real-time событий

Для real-time обновлений важен порядок. Пользователь получает событие и ожидает, что оно отражает реальное состояние системы. Особенно это важно для платежей, заказов, выплат, заявок, модерации и административных действий.

Плохой пример:

DB::transaction(function () use ($order): void {
    $order->update([
        'status' => 'paid',
    ]);

    PublishOrderStatusChangedToRealtime::dispatch(
        orderId: $order->id,
        userId: $order->user_id,
        status: 'paid',
    );
});

На первый взгляд всё нормально. Заказ обновили, job отправили. Но если job будет обработан до завершения транзакции, frontend может получить событие раньше, чем база данных зафиксирует новый статус.

Лучше так:

DB::transaction(function () use ($order): void {
    $order->update([
        'status' => 'paid',
    ]);

    PublishOrderStatusChangedToRealtime::dispatch(
        orderId: $order->id,
        userId: $order->user_id,
        status: 'paid',
    )->afterCommit();
});

Ещё лучше — создавать Laravel Event, а dispatch job выполнять в listener через afterCommit(). Тогда бизнес-факт и транспортная доставка остаются разделены.

Цепочка должна быть такой:

Транзакция началась
    ↓
Laravel изменил состояние
    ↓
Транзакция успешно завершилась
    ↓
Job отправлен в очередь
    ↓
Job публикует событие в Centrifugo
    ↓
Frontend получает real-time обновление

Если транзакция откатилась, real-time событие не должно быть отправлено. Пользователь не должен видеть состояние, которого в системе на самом деле нет.

Минимальный payload: не отправляйте Eloquent-модель в WebSocket

Одна из самых частых ошибок при публикации событий в Centrifugo — отправлять полную Eloquent-модель:

$publisher->publish(
    channel: '$users:' . $order->user_id,
    data: $order->toArray(),
);

Это плохое решение.

Во-первых, модель может содержать лишние поля. Например, технические статусы, внутренние комментарии, служебные суммы, timestamps, признаки риска, идентификаторы внешних систем или связи, которые frontend не должен получать.

Во-вторых, frontend начинает зависеть от внутренней структуры backend-модели. Любое изменение Eloquent-модели может случайно изменить WebSocket-контракт.

В-третьих, payload становится тяжёлым. Real-time сообщение должно быть коротким и понятным, а не превращаться в дамп строки из базы данных.

Правильный payload для Centrifugo должен быть минимальным:

{
  "type": "order.status.changed",
  "orderId": 7821,
  "status": "paid"
}

Для уведомления:

{
  "type": "notification.created",
  "notificationId": 981
}

Для сообщения в чате:

{
  "type": "chat.message.created",
  "roomId": 45,
  "messageId": 3001
}

Для административного события:

{
  "type": "admin.payment.risk.detected",
  "paymentId": 9912,
  "riskLevel": "high"
}

Real-time payload должен сообщать об изменении, а не заменять полноценный HTTP API. Если frontend нужны подробные данные, он может сделать обычный HTTP-запрос к Laravel и получить актуальное состояние.

Это особенно важно после reconnect. Пользователь мог потерять соединение, закрыть ноутбук, открыть вкладку позже или пропустить часть событий. Интерфейс должен уметь восстановить состояние через HTTP, а WebSocket использовать для быстрых обновлений между синхронизациями.

Поле type как основа контракта между Laravel и frontend

Поле type лучше делать обязательным для всех real-time событий. Оно позволяет frontend понять, что именно произошло.

Пример обработки события:

subscription.on('publication', function (context) {
    const event = context.data;

    if (event.type === 'order.status.changed') {
        updateOrderStatus(event.orderId, event.status);
    }

    if (event.type === 'notification.created') {
        loadNotification(event.notificationId);
    }
});

Без type frontend начинает угадывать событие по набору полей. Если есть orderId, значит это заказ. Если есть messageId, значит чат. Такой подход быстро ломается, когда real-time событий становится больше.

Для крупных проектов можно использовать более строгий формат:

{
  "type": "order.status.changed",
  "id": "evt_01",
  "occurredAt": "2026-05-09T12:00:00+00:00",
  "data": {
    "orderId": 7821,
    "status": "paid"
  }
}

Для небольших проектов достаточно простого формата:

{
  "type": "order.status.changed",
  "orderId": 7821,
  "status": "paid"
}

Главное — не отправлять случайные массивы. У real-time событий должен быть стабильный контракт.

Где хранить имена каналов и типы real-time событий

Имена каналов и типы событий не стоит размазывать строками по проекту. Если в одном месте используется $users:15, в другом users:15, а в третьем $user:15, события будут теряться. Причём выглядеть это будет не как очевидная ошибка, а как «иногда Centrifugo не доставляет сообщения». Обычно Centrifugo тут ни при чём. Просто строки разъехались.

Лучше вынести генерацию каналов в отдельный класс:

namespace App\Services\Realtime;

class RealtimeChannel
{
    public static function user(int $userId): string
    {
        return '$users:' . $userId;
    }

    public static function order(int $orderId): string
    {
        return '$orders:' . $orderId;
    }

    public static function adminPayments(): string
    {
        return '$admin:payments';
    }
}

Типы событий тоже стоит централизовать:

namespace App\Services\Realtime;

class RealtimeEventType
{
    public const ORDER_STATUS_CHANGED = 'order.status.changed';
    public const NOTIFICATION_CREATED = 'notification.created';
    public const CHAT_MESSAGE_CREATED = 'chat.message.created';
}

Тогда публикация становится понятнее:

$publisher->publish(
    channel: RealtimeChannel::user(userId: $this->userId),
    data: [
        'type' => RealtimeEventType::ORDER_STATUS_CHANGED,
        'orderId' => $this->orderId,
        'status' => $this->status,
    ],
);

Это не избыточная абстракция. Это защита от строкового хаоса. В real-time архитектуре ошибка в имени канала или типе события может стоить часов отладки.

Обработка ошибок публикации в Centrifugo

Centrifugo может быть временно недоступен. Сеть может дать сбой. API key может быть неправильным. Endpoint может вернуть ошибку. Это не должно ломать основной бизнес-процесс, если состояние уже успешно сохранено в Laravel.

Для queue job можно задать количество попыток и backoff:

namespace App\Jobs;

use App\Services\Realtime\CentrifugoPublisher;
use Illuminate\Contracts\Queue\ShouldQueue;

class PublishOrderStatusChangedToRealtime implements ShouldQueue
{
    public int $tries = 3;

    public function backoff(): array
    {
        return [5, 15, 30];
    }

    public function __construct(
        public readonly int $orderId,
        public readonly int $userId,
        public readonly string $status,
    ) {
    }

    public function handle(CentrifugoPublisher $publisher): void
    {
        $publisher->publish(
            channel: RealtimeChannel::user(userId: $this->userId),
            data: [
                'type' => RealtimeEventType::ORDER_STATUS_CHANGED,
                'orderId' => $this->orderId,
                'status' => $this->status,
            ],
        );
    }
}

Если публикация не удалась после всех попыток, ошибку нужно логировать с полезным контекстом:

event type
channel
orderId
userId
status
exception class
exception message

Не нужно логировать API key, JWT, приватные токены и чувствительные данные payload. Логи должны помогать расследованию, а не становиться отдельной утечкой.

Также важно различать критичные и некритичные события. Не каждое real-time событие должно доставляться любой ценой. Например, если пользователь пропустил событие изменения статуса заказа, frontend может восстановить актуальное состояние через HTTP. WebSocket ускоряет интерфейс, но не должен быть единственным источником правды.

Итоговая схема публикации событий из Laravel в Centrifugo

Правильная архитектурная цепочка выглядит так:

Пользователь или внешняя система выполняет действие
        ↓
Laravel проверяет запрос и права доступа
        ↓
Laravel меняет состояние в базе данных
        ↓
Laravel создаёт событие приложения
        ↓
Listener реагирует на событие
        ↓
Queue job запускается после commit транзакции
        ↓
Job публикует минимальный payload в Centrifugo
        ↓
Centrifugo доставляет событие подписчикам канала
        ↓
Frontend обновляет интерфейс

Каждый слой отвечает за свою часть:

Controller:
HTTP-вход и response.

Application/domain layer:
изменение состояния.

Laravel Event:
фиксация факта.

Listener:
реакция на факт.

Queue job:
асинхронная доставка.

CentrifugoPublisher:
публикация в Centrifugo API.

Centrifugo:
WebSocket-доставка подписчикам.

Frontend:
обновление интерфейса.

Эта схема длиннее прямого вызова из контроллера, но она надёжнее. Она лучше тестируется, проще расширяется и меньше зависит от случайных сетевых ошибок.

Практические правила для Laravel real-time событий

Для публикации событий из Laravel в Centrifugo стоит придерживаться нескольких правил.

Не публиковать real-time события напрямую из контроллеров. Контроллер не должен знать детали Centrifugo API.

Использовать Laravel Events для фиксации бизнес-фактов. Событие должно описывать, что произошло, а не как это доставить во frontend.

Использовать listeners для реакции на событие. Listener не должен превращаться в большой транспортный сервис.

Публиковать сообщения через queue jobs. Сетевой вызов к Centrifugo лучше выполнять асинхронно.

Отправлять job после commit транзакции. Frontend не должен получать событие о состоянии, которое ещё не зафиксировано в базе данных.

Делать payload минимальным. В событии нужны type, идентификатор сущности, статус или несколько конкретных полей. Полную Eloquent-модель отправлять нельзя.

Хранить имена каналов и типы событий централизованно. Это снижает риск строковых ошибок.

Обрабатывать ошибки публикации отдельно. Ошибка WebSocket-доставки не должна автоматически ломать основной бизнес-процесс.

Заключение

Публикация событий из Laravel в Centrifugo должна быть частью нормальной backend-архитектуры, а не случайным HTTP-вызовом из контроллера. Laravel меняет состояние, создаёт событие приложения, listener передаёт задачу в очередь, queue job после commit публикует минимальный payload в Centrifugo, а frontend получает real-time обновление через WebSocket.

Такой подход сохраняет правильные границы ответственности. Laravel остаётся источником истины и владельцем бизнес-логики. Centrifugo остаётся транспортным real-time слоем. Frontend получает события и обновляет интерфейс, но при необходимости восстанавливает состояние через обычный HTTP API.

Для Laravel-проектов с заказами, платежами, уведомлениями, чатами и административными панелями это наиболее устойчивый способ внедрить real-time без разрушения архитектуры. Real-time должен ускорять интерфейс, а не превращать backend в транспортную свалку.