Floxy — лёгкий Saga Workflow Engine на Go
- пятница, 28 ноября 2025 г. в 00:00:08
Большинство современных систем — это не просто код, выполняющий запросы, а последовательности действий, которые должны выполняться атомарно и восстанавливаться при сбое. Речь идёт не о бизнес-логике в пределах одной функции, а об оркестрации процессов: цепочках шагов, где каждая операция может завершиться ошибкой, требующей компенсации.
Такую задачу решает паттерн Saga — один из самых сложных и важных архитектурных паттернов. Он описывает, как выполнить серию распределённых операций с возможностью отката (rollback), не прибегая к глобальным транзакциям.
Примечание: движок пока не проходил испытаний в продакшене и не является production-ready решением. В настоящий момент проект активно тестируется и оптимизируется. Цель статьи — не заявить о production-ready платформе, а показать архитектуру Saga-движка, который можно встроить в Go-приложение без инфраструктурных накладных расходов. Если вы хотите поэкспериментировать с workflow-движком или поучаствовать в обкатке, буду рад обратной связи.
Реализация оркестрации вручную обычно быстро превращается в хаос. Ошибки приходится обрабатывать каскадно, rollback логика размазывается по коду, а попытки добавить пользовательское подтверждение или параллельные ветки делают систему непредсказуемой.
С другой стороны, есть зрелые платформы вроде Temporal или Cadence. Они надёжны, но требуют развертывания целой инфраструктуры: брокеров, воркеров и делают простой процесс зависимым от внешней экосистемы.
Между этими крайностями и появился Floxy — встраиваемая библиотека на Go, реализующая Saga-паттерн с оркестрацией, компенсациями и интерактивными шагами, без внешних сервисов и тяжёлого runtime.
Небольшое отступление. Workflow-подход — это не просто способ «красивее организовать бизнес-логику». Это архитектурный инструмент, который становится необходимым, когда простые машины состояний перестают справляться. Ниже — ключевые сценарии, в которых использование workflow приносит максимальную пользу.
Когда процесс — это не переходы между статусами, а цепочка зависимых операций:
вызовы внешних сервисов;
проверка ограничений;
резервирование ресурсов;
подтверждение пользователя;
пост-обработка результатов.
В таких случаях «статус» показывает только положение, но не само поведение процесса, а логика расползается по сервисам, cron-джобам и хендлерам. Workflow позволяет описать последовательность шагов как структуру, которую движок выполняет и контролирует.
Шаг N может завершиться ошибкой, требующей отмены шагов 1...N−1:
снять резерв товара;
вернуть деньги;
удалять созданные записи;
отменить отправку данных в сторонние сервисы.
Ручная реализация компенсаций быстро превращается в хаос. Workflow/Saga обеспечивает централизованную и предсказуемую обработку откатов. Floxy делает это без брокеров, внешних рантаймов и тяжёлой инфраструктуры.
В распределённых системах типичны проблемы:
повторное выполнение шагов;
гонки за состояние;
«зависшие» сущности;
дубли запросов.
Workflow фиксирует состояние каждого шага и позволяет движку безопасно выполнять повторы, используя уникальный для каждого шага idempotency key. Разработчик концентрируется на бизнес-логике, а не на defensive programming.
Процессы с:
условными ветками,
параллельной обработкой (Fork),
синхронизацией (Join),
несколькими путями завершения.
В статусных моделях это приводит к бесконечным промежуточным состояниям и трудно отслеживаемой логике. Workflow выражает это декларативно.
Если процесс должен:
ждать подтверждения пользователя,
проходить ручную модерацию,
зависеть от действий человека или внешнего сервиса,
то статус «WAITING_APPROVAL» ничего не гарантирует — кроме того, что кто-то когда-то должен изменить его на «APPROVED». Workflow фиксирует точку остановки и корректно продолжает процесс после подтверждения.
Когда выполнение занимает:
минуты,
часы,
дни,
требует ожидания событий извне,
workflow — единственный способ надёжно хранить прогресс, восстанавливаться после сбоев и контролировать время выполнения.
Когда бизнес-процессы меняются со временем:
добавляются шаги;
изменяются условия;
меняются компенсации.
Статусная модель превращается в клубок условий «если объект старого формата». Workflow позволяет запускать старые и новые процессы параллельно, используя разные версии схемы.
Если важно:
видеть граф процесса;
понимать, где он остановился;
разбираться, какие шаги выполнялись;
анализировать ошибки.
Workflow делает историю частью модели. Статусы дают лишь текущий срез, лишая контекста.
Используйте workflow, а не статусную модель, когда процесс:
сложный, ветвистый или многошаговый;
требует компенсаций при сбое;
должен гарантировать идемпотентность;
включает внешних участников или ручные действия;
может выполняться долго;
требует возобновления после сбоев;
должен быть наблюдаемым и диагностируемым;
должен иметь версионирование.
В общем, в подобных ситуациях стоит не плодить статусы внутри сущностей, а вынести процесс в отдельные таблицы и управлять им через workflow. Это упрощает логику и делает систему надёжнее.
Floxy опирается на простую идею: workflow — это часть программы, а не отдельный сервис.
Вместо выделенной платформы с RPC и брокерами Floxy предлагает библиотеку, в которой бизнес-процесс описывается с помощью обычного кода Go — без нового языка или YAML-файлов.
Основные принципы:
Минимализм. Всё строится вокруг context.Context, pgx и простых структур данных.
Предсказуемость. Любое состояние хранится в PostgreSQL; поведение детерминировано.
Изоляция. Все таблицы создаются в схеме workflows, не мешая основной схеме базы данных.
Оркестрация как библиотека. Saga, retry, rollback и human-in-the-loop доступны без внешнего runtime.
Версионирование. Каждый workflow-шаблон имеет номер версии, обеспечивая безопасное развитие процессов.
Floxy реализует полный набор функций для построения надёжных оркестраций:
Saga с оркестрацией и компенсациями. Каждый шаг может иметь OnFailure-обработчик, выполняющий откат или компенсацию.
SavePoint. Частичный rollback до последней сохранённой точки.
Conditional steps. Ветвления логики с помощью Go templates — без внешнего DSL.
Parallel / Fork / Join. Параллельные ветви выполнения и последующая синхронизация.
Human-in-the-loop. Поддержка шагов, требующих участия человека (confirm, reject).
Cancel и Abort. Мягкая отмена или немедленное завершение workflow.
Dead Letter Queue. Режим DLQ с приостановкой workflow и ручным восстановлением.
Idempotency-aware шаги. Контекст выполнения (StepContext) предоставляет метод IdempotencyKey(), помогающий разработчикам реализовать безопасные операции.
Миграции встроены через go:embed. Floxy полностью самодостаточен и имеет функцию применения миграций.
Floxy — это библиотека с простыми, но выразительными абстракциями:
Store — слой хранения шаблонов, инстансов шаблонов, состояний и событий (PostgreSQL через pgx).
Builder — билдер workflow шаблонов
Engine — исполнитель и координатор шагов: планирует, откатывает, повторяет, синхронизирует.
Worker Pool — фоновый пул, обрабатывающий очередь шагов.
Каждый шаг выполняется в контексте (context.Context), а фоновый воркер проверяет таблицу workflow_cancel_requests, чтобы своевременно прерывать long-running шаги.
Workflow в Floxy — это ориентированный ациклический граф шагов (DAG), определяемый через встроенный Builder API.
Builder формирует структуру в виде списка смежности, проверяет наличие циклов и сериализует описание в JSON для хранения в workflow_definitions.
wf, _ := floxy.NewBuilder("order", 1).
Step("reserve_stock", "stock.Reserve").
Then("charge_payment", "payment.Charge").
OnFailure("refund", "payment.Refund").
Step("send_email", "notifications.Send").
Build()Если Builder обнаруживает цикл, Build() возвращает ошибку — проверяя корректность графа ещё до запуска флоу в движке.
Каждый шаблон workflow хранится с номером версии. При обновлении шаблона разработчику необходиму увеличить номер версии. Таким образом, запущенные экземпляры продолжают выполнение по своей оригинальной схеме.
Все таблицы Floxy находятся в отдельной схеме workflows, включая таблицы workflow_instances, workflow_steps, workflow_events и workflow_definitions и другие. Это обеспечивает полную изоляцию и упрощает интеграцию в существующие приложения.
Floxy поддерживает интерактивные шаги (StepTypeHuman), которые приостанавливают выполнение и ждут решения пользователя.
Workflow переходит в состояние waiting_decision, а решение (confirmed или rejected) записывается в таблицу workflow_human_decisions. После этого движок либо продолжает выполнение, либо завершает процесс с ошибкой.
Таким образом, Floxy может использоваться не только для автоматических процессов, но и для сценариев с подтверждением, ревью или ручным контролем.
Floxy поддерживает два механизма остановки:
Cancel — выполняет rollback до корня (save points игнорируются),
Abort — немедленно прерывает выполнение без компенсации.
Оба варианта инициируются добавлением записи в таблицу workflow_cancel_requests. Фоновый воркер периодически опрашивает её и вызывает context.CancelFunc() для активных шагов соответствующего инстанса.
Floxy поддерживает два различных режима обработки ошибок:
Классический режим Saga (по умолчанию): при сбое шага движок выполняет откат к последней точке сохранения и запускает обработчики компенсаций.
Режим DLQ: откат отключен, рабочий процесс приостанавливается в состоянии dlq, а неудавшиеся шаги сохраняются в DLQ для ручного анализа.
Когда DLQ включен для рабочего процесса:
Обработчики компенсаций не выполняются.
Рабочий процесс приостановлен: workflow переходит в статус dlq (не терминальный, может быть возобновлен).
Активные шаги заморожены: все запущенные шаги переведены в состояние паузы.
Очередь очищена: очередь экземпляра очищается для предотвращения дальнейшего выполнения.
Ручное восстановление: после устранения проблем используйте RequeueFromDLQ для возобновления.
Ниже представлена верхнеуровневая диаграмма последовательностей работы движка:
Диаграммы с подробным разбором работы движка есть в директории docs репозитория floxy.

Floxy покрыт большим количеством unit- и интеграционных тестов, которые используют testcontainers для автоматического запуска PostgreSQL в контейнере. Это позволяет проверять корректную работу движка во всех сценариях — от простых последовательных флоу до сложных параллельных и компенсационных процессов.
Дополнительно для проекта реализовано хаос-тестирование, позволяющее проверять устойчивость движка к задержкам, сбоям и непредсказуемому поведению окружения. Примеры таких тестов можно найти в отдельном репозитории: https://github.com/floxy-project/floxy-stress-test. Тестирование сделано при помощи фреймворка ChaosKit, специально разработанного для chaos-тестирования floxy (фреймворк общего назначения, но идея его создать была изначально именно с целью тестирования workflow движка).
Кроме того, в репозитории размещено множество примеров (./examples), которые демонстрируют различные типы шагов, использование OnFailure, ветвления, условия, human-in-the-loop сценарии и rollback-политику. Это делает вход в проект простым и наглядным даже для новичков в Go.
Помимо этого, репозиторий оснащен большим количеством документации и PlantUML диаграммами, позволяющими детально понять процесс работы движка.
Floxy не использует брокеров, RPC или внешние демоны. Он работает полностью внутри процесса приложения, опираясь только на PostgreSQL и стандартные пакеты Go и pgx:
pgx — быстрый драйвер и пул соединений;
context — управление временем жизни операций;
net/http — REST API через новый ServeMux;
go:embed — встроенные миграции и схемы.
Несмотря на наличие фоновых воркеров и планировщика, Floxy остаётся библиотекой, а не платформой — без отдельных бинарников или RPC протоколов.
engine := floxy.NewEngine(pgxPool)
defer engine.Shutdown()
wf, _ := floxy.NewBuilder("order", 1).
Step("reserve_stock", "stock.Reserve").
Then("charge_payment", "payment.Charge").
OnFailure("refund", "payment.Refund").
Step("send_email", "notifications.Send").
Build()
engine.RegisterWorkflow(ctx, wf)
engine.RegisterHandler(&ReserveStock{})
engine.RegisterHandler(&ChargePayment{})
engine.RegisterHandler(&RefundPayment{})
engine.RegisterHandler(&Notifications{})
workerPool := floxy.NewWorkerPool(engine, 3, 100*time.Millisecond)
workerPool.Start(ctx)
instanceID, err := engine.Start(ctx, "order-v1", input)Floxy развивается как полноценная экосистема инструментов для всех стадий работы с workflow.
Floxy Pro — расширенная версия библиотеки, созданная для больших объемов данных. Она добавляет:
Partitioned Tables с pg_partman. Все основные таблицы (workflow_instances, workflow_steps, workflow_events, workflow_dlq) разбиваются на партиции по created_at с ежедневными секциями. Это обеспечивает автоматическое управление, быструю очистку старых данных и высокий уровень производительности при больших объёмах записей.
Автоматическое управление партициями. pg_partman создаёт новые секции на 30 дней вперёд и удаляет старые (90-дневное хранение) — без ручного вмешательства.
floxyctl. CLI-инструмент для запуска, отладки и управления workflow-инстансами. Поддерживает два режима:
In-memory — выполнение YAML-описанных workflow без БД (подходит для тестов, CI/CD и скриптов).
Database Mode — управление workflow-инстансами в PostgreSQL: запуск, отмена (cancel), аварийное завершение (abort).
floxyctl особенно полезен в DevOps и CI/CD pipelines, где шаги описываются в YAML и выполняются через bash-скрипты или HTTP-запросы. Это позволяет использовать Floxy как workflow engine, даже если основное приложение написано не на Go (например, Ruby, Python или Node.js) — шаги могут вызываться через HTTP-хендлеры.
floxyd. Демон непрерывной обработки workflow (для других языков программирования). Работает как долговременный сервис с конфигурируемым пулом воркеров, polling-интервалами и статистикой выполнения. Поддерживает Bash и HTTP-хендлеры, TLS, health checks и Prometheus metrics. floxyd подходит для постоянной эксплуатации workflow-процессов.
Отличие floxyctl и floxyd:
Характеристика | floxyctl | floxyd |
Режим работы | CLI | Долговременный демон |
Хранилище | In-memory или PostgreSQL | Только PostgreSQL |
Назначение | Тесты, локальные запуски, CI/CD | Тесты, локальные запуски, CI/CD, долговременные процессы |
Выполнение | Одноразовое | Непрерывное |
Floxy UI — веб-интерфейс для визуализации и управления процессами, больше подходит для dev-среды.
Плагины для GoLand и VS Code предоставляют визуализацию workflow по Go-коду: автоматически строят PlantUML или Mermaid-диаграммы, что удобно для документирования и ревью бизнес-процессов.
Так выглядит отображение workflow в GoLand с плагином:

Таким образом, Floxy — это уже не просто OpenSource библиотека, а цельная экосистема инструментов, позволяющая проектировать, визуализировать и выполнять сложные workflow в распределённых системах.
Если вы дочитали до этого места, то, вероятно, вам интересны не только возможности Floxy, но и то, как всё это работает внутри. Значит, вы входите в ту часть аудитории, которая любит разбираться в механизмах и архитектурных решениях — и для вас подготовлен расширенный раздел.
Далее мы разберём принципы исполнения и те решения, которые определили дизайн библиотеки, которые обычно остаются «за кадром» библиотечной абстракции.
Если же вам достаточно обзорной части и вы хотите перейти сразу к итогам — можете смело перемещаться к Заключению или ознакомиться непосредственно с проектом по ссылке на GitHub: github.com/floxy-project/floxy.
Философия: Минимизация инфраструктурных зависимостей.
Большинство workflow-движков требуют комбинацию из нескольких компонентов: Redis или RabbitMQ для очереди задач, PostgreSQL для хранения состояния, отдельный сервис для распределенных блокировок. Floxy использует только PostgreSQL для всех этих задач.
Это упрощает развертывание: вместо 3-4 сервисов нужна только одна база данных. Меньше точек отказа, проще процедуры резервного копирования и восстановления.
Механизм очереди:
-- Извлечение задачи с учетом приоритетов и времени ожидания (priority aging)
WITH next_item AS (
SELECT id
FROM workflows.workflow_queue
WHERE scheduled_at <= NOW() AND attempted_at IS NULL
ORDER BY
LEAST(100, priority + FLOOR(EXTRACT(EPOCH FROM (NOW() - scheduled_at)) * 0.5)) DESC,
scheduled_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE workflows.workflow_queue
SET attempted_at = NOW(), attempted_by = $workerID
FROM next_item
WHERE workflows.workflow_queue.id = next_item.id
RETURNING *Хранение состояния:
workflow_instances — жизненный цикл workflow
workflow_steps — история выполнения шагов
workflow_events — журнал аудита
workflow_join_state — координация fork/join конструкций
Распределенные блокировки:
FOR UPDATE SKIP LOCKED вместо блокировок в Redis
attempted_by + attempted_at для отслеживания владельца задачи
Автоматическое освобождение через ReleaseQueueItem()
Единая точка конфигурации
ACID-гарантии из коробки
Упрощенный мониторинг (один источник данных)
Встроенные механизмы резервного копирования
Меньше сетевых задержек (нет межсервисных вызовов)
PostgreSQL становится узким местом при очень высоких нагрузках
Требуется правильная настройка пула соединений
Обслуживание индексов критично для производительности
Философия: Каждый шаг — атомарная единица работы.
func (engine *Engine) ExecuteNext(ctx context.Context, workerID string) error {
return engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
// 1. Извлечение из очереди
item := store.DequeueStep(ctx, workerID)
// 2. Выполнение шага
output, err := handler.Execute(ctx, stepCtx, input)
// 3. Обновление состояния
store.UpdateStep(ctx, stepID, status, output, errMsg)
// 4. Постановка следующих шагов в очередь
store.EnqueueStep(ctx, instanceID, nextStepID, priority, delay)
// 5. Удаление из очереди
store.RemoveFromQueue(ctx, queueID)
// Все или ничего
return nil
})
}Консистентность: Если обработчик упал — происходит откат транзакции, и шаг остается в очереди. Если база данных недоступна — никаких частичных обновлений. Невозможна ситуация, когда шаг помечен выполненным в базе, но следующий шаг не поставлен в очередь.
Идемпотентность:
// Повторное выполнение безопасно благодаря IdempotencyKey
step := &WorkflowStep{
IdempotencyKey: uuid.NewString(), // уникален для каждой попытки
RetryCount: 0,
}
// При повторе того же шага:
// - Тот же IdempotencyKey
// - RetryCount увеличивается
// - Обработчик может проверить дедупликациюВосстановление: Падение воркера приводит к автоматическому откату транзакции. Элемент очереди освобождается, и другой воркер может его забрать. Нет "осиротевших" состояний.
// ReadCommitted для операций
txManager.ReadCommitted(ctx, func(ctx context.Context) error {
// Видим только зафиксированные данные
// Минимальная конкуренция за блокировки
})Что внутри транзакции:
Извлечение шага из очереди с захватом блокировки на уровне строки
Загрузка определения workflow
Выполнение обработчика
Обновление статуса шага
Уведомление join-узлов
Постановка следующих шагов в очередь
Логирование событий
Что снаружи:
Выполнение обработчика может делать внешние вызовы (API и т.д.)
Но изменение состояния происходит только после успешного возврата
Внешние побочные эффекты не откатываются. Решение: идемпотентность в обработчиках плюс компенсация.
Накладные расходы на транзакцию для каждого шага. Решение: пулинг соединений и воркеров.
Философия: Используем эксклюзивные блокировки, но избегаем ожидания их освобождения.
-- Неблокирующие пессимистичные блокировки (оптимально):
SELECT * FROM queue
WHERE status = 'pending'
LIMIT 1
FOR UPDATE SKIP LOCKED; -- Пропускаем заблокированные строки
-- Воркер A берет и блокирует строку 1
-- Воркер B видит что строка 1 заблокирована, пропускает её, берет строку 2
-- Воркер C видит что строки 1,2 заблокированы, пропускает их, берет строку 3
-- Максимальный параллелизмSKIP LOCKED не делает пессимистичную блокировку оптимистичной. Это оптимизация поведения при конкуренции: блокировка остается пессимистичной (эксклюзивной), но обработка конкуренции становится неблокирующей.
Результат: преимущества обоих подходов — нет конфликтов (как у пессимистичных) и нет ожидания (как у оптимистичных), что дает максимальную пропускную способность.
Сценарий 1: Нормальная работа
Воркер A: DequeueStep() — получает step_1 (заблокирован)
Воркер B: DequeueStep() — пропускает step_1, получает step_2
Воркер C: DequeueStep() — пропускает step_1,2, получает step_3Сценарий 2: Падение воркера
Воркер A: взял step_1, упал
Откат транзакции
Воркер B: DequeueStep() — получает step_1 (уже разблокирован)Сценарий 3: Cancel workflow
Воркер A: выполняет step_1 (заблокирован)
Воркер B: CancelWorkflow() — GetActiveStepsForUpdate()
— пропускает step_1 (заблокирован воркером A)
— останавливает другие шаги
Воркер A: завершает выполнение — проверяет отмену — откатываетсяФилософия: Доступность расширения через хуки.
type Plugin interface {
Name() string
}
type WorkflowStartPlugin interface {
BasePlugin
OnWorkflowStart(ctx context.Context, instance *WorkflowInstance) error
}
type StepCompletePlugin interface {
BasePlugin
OnStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error
}
// и так далее для других хуковУровень workflow:
OnWorkflowStart — перед первым шагом
OnWorkflowComplete — после успешного завершения
OnWorkflowFailed — при неудаче workflow
Уровень шага:
OnStepStart — перед выполнением обработчика
OnStepComplete — после успешного шага
OnStepFailed — при неудаче шага
Уровень отката:
OnRollbackStepChain — при компенсации
// В engine.executeStep():
func (engine *Engine) executeStep(...) error {
// 1. Хук плагина ДО
if engine.pluginManager != nil {
if err := engine.pluginManager.ExecuteStepStart(ctx, instance, step); err != nil {
return fmt.Errorf("plugin hook failed: %w", err)
}
}
// 2. Основная логика
output, err := handler.Execute(ctx, stepCtx, input)
// 3. Хук плагина ПОСЛЕ
if err != nil {
engine.pluginManager.ExecuteStepFailed(ctx, instance, step, err)
} else {
engine.pluginManager.ExecuteStepComplete(ctx, instance, step)
}
}Плагин метрик:
type MetricsPlugin struct {}
func (p *MetricsPlugin) OnStepStart(ctx, instance, step) error {
metrics.Counter("floxy.step.started", tags("workflow", instance.WorkflowID))
return nil
}
func (p *MetricsPlugin) OnStepComplete(ctx, instance, step) error {
duration := step.CompletedAt.Sub(*step.StartedAt)
metrics.Histogram("floxy.step.duration", duration)
return nil
}Плагин аудита:
type AuditPlugin struct {
logger *slog.Logger
}
func (p *AuditPlugin) OnWorkflowStart(ctx, instance) error {
p.logger.Info("workflow started",
"instance_id", instance.ID,
"workflow", instance.WorkflowID,
"input", instance.Input)
return nil
}type PluginManager struct {
plugins []Plugin
}
func (pm *PluginManager) ExecuteStepStart(...) error {
for _, plugin := range pm.plugins {
if p, ok := plugin.(StepStartPlugin); ok {
if err := p.OnStepStart(ctx, instance, step); err != nil {
// Логируем, но не прерываем весь workflow
slog.Warn("plugin failed", "plugin", plugin.Name(), "error", err)
}
}
}
return nil
}На момент написания статьи были реализованы следующие плагины:
для REST API:
cancel - endpoint для отмены выполнения workflow
abort - endpoint для аварийного завершения workflow
cleanup - endpoint для очистки устаревших инстансов workflow
dlq - endpoint для возврата workflow, находящегося в DQL, с измененным input
human-decision - endpoint для подтверждения или отклонения workflow, ожидающего решения от пользователя
для движка:
metrics - сбор prometheus метрик
telemetry - сбор телеметрии OpenTelemetry\Jaeger
notifications - отправка нотификаций при изменении состояния workflow\шага
validate - применение кастомных правил валидации к input данным шагов
audit - запись аудит-лога
rollback-depth - отслеживание глубины выполнения rollback, реализован специально для chaos-тестирования
Преимущества:
Ядро остается не перегруженным и понятным
Каждая функция — opt-in через плагин
Легко тестировать ядро без плагинов
Сторонние интеграции без форка кода
Без плагинов: Пришлось бы добавлять в ядро: метрики, логирование, webhook'и, circuit breakers... Ядро раздувается, появляется связность с внешними сервисами.
С плагинами: Ядро содержит только логику выполнения workflow. Всё остальное — через плагины. Можно комбинировать в конструкторе.
Философия: Воркеры не хранят состояние, всё в PostgreSQL.
type Engine struct {
// Разделяемое состояние (read-only после инициализации):
txManager TxManager
store Store
handlers map[string]StepHandler // Реестр, не меняется
pluginManager *PluginManager
cancelContexts map[int64]map[int64]context.CancelFunc // Отмена в памяти
// Чего нет:
// - состояние workflow
// - прогресс шагов
// - элементы очереди
// - состояние join-узлов
}Горизонтальное масштабирование:
PostgreSQL
├── Воркер 1 (цикл ExecuteNext)
├── Воркер 2 (цикл ExecuteNext)
├── Воркер 3 (цикл ExecuteNext)
└── Воркер N (цикл ExecuteNext)Каждый воркер:
DequeueStep() — берет работу
Выполняет обработчик
Обновляет состояние
Повторяет цикл
Координация не нужна:
Нет выбора лидера
Нет синхронизации состояния между воркерами
Нет gossip-протокола
PostgreSQL — единственная точка синхронизации
Проблема:
У воркера A есть обработчик "sendEmail"
У воркера B нет обработчика "sendEmail"
Как избежать ошибок?
Решение: задержка при отсутствии обработчика
func (engine *Engine) executeStep(...) error {
// Проверяем, зарегистрирован ли обработчик локально
engine.mu.RLock()
_, hasHandler := engine.handlers[stepDef.Handler]
engine.mu.RUnlock()
if !hasHandler {
// Освобождаем элемент очереди для другого воркера
delay := engine.jitteredCooldown() // 1s +/- 20%
store.RescheduleAndReleaseQueueItem(ctx, queueID, delay)
// Логирование с троттлингом
if engine.shouldLogSkip(logKey) {
store.LogEvent(ctx, instanceID, nil, EventStepSkippedMissingHandler, ...)
}
return nil // Не помечаем шаг как failed
}
// Обработчик есть — выполняем
return handler.Execute(ctx, stepCtx, input)
}Как работает:
t=0: Воркер A извлекает шаг "sendEmail"
— нет обработчика
— откладывает на t=1s
— освобождает
t=1s: Воркер B извлекает шаг "sendEmail"
— есть обработчик
— выполняетЗадержка с джиттером:
func (engine *Engine) jitteredCooldown() time.Duration {
base := 1 * time.Second
jitterPct := 0.2 // +/- 20%
// Случайное значение [-0.2, +0.2]
delta := (rand.Float64()*2 - 1) * jitterPct
// 1s * (1 +/- 0.2) = [0.8s, 1.2s]
return time.Duration(float64(base) * (1 + delta))
}Зачем джиттер? Избегаем эффекта thundering herd:
Без джиттера:
t=0: 10 воркеров пытаются взять шаг
t=1s: все 10 снова пытаются одновременно
С джиттером:
t=0: 10 воркеров пытаются взять шаг
t=0.8-1.2s: воркеры повторяют попытку в разное времяСценарий:
t=0: Воркер A извлекает step_1
BEGIN TRANSACTION
Выполняет обработчик... (занимает 5 секунд)
t=3: Воркер A падает
t=3: PostgreSQL: откат транзакции
attempted_at = NULL
attempted_by = NULL
t=4: Воркер B извлекает step_1
Начинает зановоТребование идемпотентности:
// Обработчик должен быть готов к повтору
type Handler interface {
Execute(ctx, stepCtx, input) (output, error)
}
// stepCtx.IdempotencyKey() можно использовать для дедупликацииЗадача:
Воркер A выполняет шаг
Воркер B хочет отменить workflow
Как синхронизировать?
Решение: таблица запросов отмены + фоновый воркер
// Воркер B:
func (engine *Engine) CancelWorkflow(ctx, instanceID, ...) error {
// Просто создаем запись
store.CreateCancelRequest(ctx, &WorkflowCancelRequest{
InstanceID: instanceID,
CancelType: CancelTypeCancel,
})
// Не ждем завершения
}
// Воркер A (фоновая горутина):
func (engine *Engine) cancelRequestsWorker() {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
engine.processCancelRequests()
}
}
}
func (engine *Engine) processCancelRequests() {
// Проверяем активные workflow
for _, instanceID := range engine.cancelContexts {
req := store.GetCancelRequest(ctx, instanceID)
if req != nil {
// Отменяем контекст для этого instance
cancelFunc()
}
}
}В цикле выполнения:
func (engine *Engine) executeStep(...) error {
// Регистрируем контекст отмены
handlerCtx, cancel := context.WithCancel(ctx)
engine.registerInstanceContext(instanceID, stepID, cancel)
defer engine.unregisterInstanceContext(instanceID, stepID)
// Выполняем
output, err := handler.Execute(handlerCtx, stepCtx, input)
// Проверяем, была ли отмена во время выполнения
if errors.Is(handlerCtx.Err(), context.Canceled) {
req := store.GetCancelRequest(ctx, instanceID)
if req != nil {
return engine.handleCancellation(...)
}
}
}Естественная балансировка через очередь:
Нет sticky-маршрутизации
Нет назначения партиций
Каждый воркер просто берет следующий доступный элемент
Обработка приоритетов:
ORDER BY priority DESC, scheduled_at ASCЭлементы с высоким приоритетом обрабатываются первыми, но все воркеры видят одну и ту же очередь.
Priority Aging:
LEAST(100, priority + FLOOR(wait_seconds * 0.5))Элементы с низким приоритетом постепенно поднимаются, что предотвращает голодание.
Blue-Green:
СТАРЫЕ: Воркеры 1-5 с обработчиками {A, B, C}
НОВЫЕ: Воркеры 6-10 с обработчиками {A, B, C, D}
Развертывание НОВЫЕ — оба пула работают параллельно
Остановка СТАРЫЕ — плавное завершениеCanary:
90% трафика — СТАРЫЕ воркеры
10% трафика — НОВЫЕ воркеры (с новой версией обработчика)
Мониторинг ошибок — если OK, увеличиваем до 100%Rolling:
Остановка Воркер 1 — Развертывание новой версии — Запуск Воркер 1
Остановка Воркер 2 — Развертывание новой версии — Запуск Воркер 2
...Преимущества:
Простое масштабирование (просто добавь воркеров)
Нет накладных расходов на синхронизацию состояния
Автоматическое восстановление после падений
Нет сценариев split-brain
Ограничения: PostgreSQL — единственная точка конкуренции. Решения: реплики для чтения, партиционирование для очень больших нагрузок.
Реестр обработчиков должен быть согласованным. Решения: версионирование определений workflow, плавное устаревание обработчиков.
Параллельные ветви (Fork/Join) создают классическую гонку: один шаг уже успел завершиться, а другой в параллельной ветке упал и инициировал rollback. Если следующий шаг уже был взят воркером до того, как движок попытался остановить ветку — он всё равно выполнится, что приводит к неконсистентному состоянию (workflow в статусе failed, но есть шаг в статусе completed).
Floxy решает проблему через стратегию Defense in Depth — сочетание двух уровней защиты:
Сразу после failure движок:
находит соответствующий fork,
помечает все pending шаги параллельных веток как skipped,
останавливает их до начала rollback.
Этот механизм снижает вероятность гонки, но не устраняет её полностью — воркер мог уже начать выполнение шага.
Если несмотря на превентивную остановку какие-то шаги всё же успели завершиться, движок выполняет «реактивный откат»:
определяет последний savepoint,
находит все completed шаги, созданные после него,
ставит их компенсации в очередь (в обратном порядке),
завершает workflow только после выполнения компенсаций.
Таким образом, движком достигается eventual consistency.
Для вложенных Fork внутри Fork выполнение rollback на данный момент не поддерживается.
Стоит подчеркнуть, что проблема «overshoot completion» (когда шаг успевает завершиться после того, как другая ветка уже упала и инициировала rollback) — это фундаментальное свойство всех распределённых workflow-систем:
после выдачи шага worker-у его невозможно гарантированно остановить;
между моментом планирования шага и фактическим исполнением всегда есть временной интервал;
шаг может завершиться «слишком поздно», уже после начала rollback;
корректная модель — выполнение компенсаций post-factum.
Именно поэтому двухуровневый rollback (Preventive + Reactive) — это не обходной путь, а корректная реакция на неизбежные гонки исполнения.
Ключевые принципы:
Простота > Возможности: Один PostgreSQL вместо множества сервисов
Транзакции > Координация: ACID-гарантии вместо распределенного консенсуса
Неблокирующие операции > Блокировки: SKIP LOCKED вместо очередей ожидания
Композиция > Монолит: Плагины вместо раздутого ядра
Без состояния > С состоянием: Управление через базу данных вместо координации в памяти
Многоуровневая защита > Атомарный контроль: Превентивные и реактивные механизмы обеспечивают корректность Fork/Join при неизбежных гонках
Эти принципы обеспечивают:
Простую операционную модель
Предсказуемое поведение
Легкую отладку
Надежное восстановление
Floxy решает ту же задачу, что и крупные оркестраторы, но с философией библиотечного подхода, свойственной Go: минимум абстракций, максимум контроля.
Он реализует Saga-паттерн с оркестрацией, поддерживает компенсации, условия, параллелизм и интерактивные шаги — при этом остаётся лёгким, прозрачным и встраиваемым.
github.com/floxy-project/floxy
P.S. Статья и так получилась достаточно объёмной, поэтому многие детали «внутренней кухни» остались за рамками. Если вам интересно углубиться в архитектуру Floxy, я подготовлю отдельный материал — о том, как реализованы Fork/Join, как определяется набор терминальных шагов для синхронизации в Join, как устроен rollback и другие внутренние механизмы.