javascript

Как работает код, который спит месяц

  • суббота, 9 марта 2024 г. в 00:00:16
https://habr.com/ru/companies/ruvds/articles/798437/

В первой части этого небольшого цикла статей мы говорили о том, что механизм устойчивого выполнения (durable execution) сохраняет состояние программы в журнале, а также о связанных с этим сложностях в случае обновлений служебного кода, ведущих к утрате журналом актуальности. Мы увидели, что ограничение времени выполнения обработчика существенно облегчает эту проблему. Но… не ведёт ли это к потере одного из наиболее интересных свойств устойчивого выполнения — возможности создавать бизнес-процессы, работающие с длительными паузами? В Restate мы считаем, что при использовании правильных примитивов можно ничего не потерять.

Тем не менее, если вы любите писать код с долгими периодами ожидания, потому что он хорошо согласуется с вашей моделью мышления, то Restate поможет вам реализовать это в полной мере. Если же вы цените устойчивое выполнение, но скептично относитесь к долго выполняющимся обработчикам и проблемам с их версионированием, то для этого есть решение. Ниже описаны несколько способов получить те же свойства путём добавления в этот механизм устойчивого обмена сообщениями и состояния.

▍ Приостановка обработчика


Такая простая задача, как отправка вашим пользователям электронного письма через месяц после их регистрации, как правило, оказывается на удивление сложной. Очевидно, что вы не можете сделать это во время запроса на регистрацию, который должен завершиться. Пожалуй, здесь потребуется записать своё намерение в базу данных или очередь и вернуться к нему позже. А для этого уже нужно создать cronjob или потребителя. По факту же вам нужно просто вызвать почтовую службу, но сообщить ей «выполни это через месяц».

Инструменты вроде Temporal и Durable Functions предоставляют вам абстракцию «handler suspension» (приостановка обработчика). Она позволяет создавать обработчики, которые будут выполнять запланированную работу через, например, месяц ожидания. То есть в описанном выше случае с отправкой письма вам нужно будет просто запустить обработчик во время запроса на регистрацию, сопроводив его параметром задержки. В итоге, если выполнение начнётся, через месяц запланированное письмо будет отправлено. В Restate этот шаблон будет выглядеть так:

const emailService = restate.router({
  email: async (ctx: restate.RpcContext, request: { email: string, delay: number }) => {
    // добавление новых шагов до этапа засыпания создаст сложности
    await ctx.sleep(request.delay)
    await ctx.sideEffect(() => ses.sendEmail(...))
  }
});
const emailApi: restate.ServiceApi<typeof emailService> = { path: "email" };

Присутствие активного запроса в течение месяца — не идеальное решение. В этом случае при желании изменить код обработчика почты до вызова ctx.sleep — например, чтобы добавить проверку валидности письма — потребуется сохранить старую версию до конца месяца, пока существующие запросы в ней не завершатся, так как в новой версии они провалятся. Это означает, что любая функциональность, добавляемая до команды sleep, не возымеет полного эффекта до истечения месяца. Кроме того, станет очень трудно применять срочные патчи безопасности, поскольку потребуется устанавливать их для всех старых версий, которые могут быть вызваны.

Какое же свойство нам здесь действительно нужно? Как уже говорилось, нам просто нужно отправить к почтовому сервису запрос, который должен быть обработан через месяц. Тогда зависшим в ожидании «активным» элементом окажется не журнал частично законченного рабочего потока, возможно, со множеством шагов, а одно сообщение запроса. Версионировать же запрос уже гораздо проще. Достаточно вносить новые параметры как опциональные, не удаляя существующие. Также можно использовать Protocol Buffers, чтобы наладить проверку обратной совместимости. Выглядеть это будет так:

const emailService = restate.router({
  delayedEmail: async (ctx: restate.RpcContext, request: { email: string, delay: number }) => {
    // добавление сюда новых шагов не повлияет на активные запросы
    ctx.sendDelayed(emailApi, request.delay).send({ email: request.email })
  }
  // недолго выполняющийся обработчик
  email: async (ctx: restate.RpcContext, request: { email: string }) => {
    await ctx.sideEffect(() => ses.sendEmail(...))
  }
});
const emailApi: restate.ServiceApi<typeof emailService> = { path: "email" };

Фишка в том, чтобы делать большие паузы не внутри вызовов, а между, версионируя запрос по необходимости. Мы перешли от приостанавливаемой абстракции обработчика к приостанавливаемой абстракции RPC. Restate здесь играет роль устойчивой шины событий, а также устойчивого исполнителя.

▍ Циклы управления


А что, если вам нужно написать рабочий поток для цикла управления, подразумевающий выполнение набора задач каждый час, возможно, до бесконечности (или пока не будет достигнут некий установленный предел истории)? Многие используют рабочие потоки таким образом. Это упрощает понимание и налаживание долгоживущих циклов управления. В таком случае для версионирования нужно остановить все циклы, провести обновление и заново их запустить. Но это может стать очень трудоёмкой задачей при наличии тысячи активных циклов. В коде это выглядит так:

const controlService = restate.router({
  loop: async (ctx: restate.RpcContext) => {
    // настройка до запуска цикла - не должна изменяться во время выполнения запросов
    let state: State = { ... }

    while (true) {
      // работа цикла - не должна изменяться во время выполнения запросов
      state = await ctx.sideEffect(() => mutateState(state))

      if (endCondition) {
        return
      }

      await ctx.sleep(1000 * 3600) // 1 hour
    }
  }
});
const controlApi: restate.ServiceApi<typeof controlService> = { path: "control" };

Как решить эту проблему без использования неограниченно долгих обработчиков? Можно использовать тот же принцип: делать длительные паузы между вызовами. На практике это будет просто хвостовая рекурсия:

const controlService = restate.router({
  setup: async (ctx: restate.RpcContext) => {
    // настройка до цикла
    const state = { ... }

    ctx.send(controlApi).loop(state)
  },
  loop: async (ctx: restate.RpcContext, state: State) => {
    // работа цикла
    const nextState = await ctx.sideEffect(() => mutateState(state))

    if (!endCondition) {
      // планирование очередной итерации
      ctx.sendDelayed(controlApi, 1000 * 3600).loop(nextState) // 1 час
    }
  },
});
const controlApi: restate.ServiceApi<typeof controlService> = { path: "control" };

Здесь нам, опять же, версионировать нужно не так много — только тело запроса, в данном случае состояние цикла, которое должно сохранять согласованность между итерациями.

▍ Виртуальные объекты


Ещё один типичный случай длительных обработчиков — это их использование для «владения» неким сложным состоянием, которое воссоздаётся в памяти из журнала при каждом воспроизведении кода. Этот механизм имитирует принцип работы Durable Objects в Cloudflare. По сути, мы используем журнал конкретного вызова в виде журнала только для записи. Например, можно использовать рабочий поток для хранения состояния шахматной партии. Этот шаблон у нас в Restate ещё недостаточно хорошо проработан (хотя вы можете сами написать для него библиотеку), и ниже мы разберём, почему. Сейчас же просто представим, что у нас есть примитив stream, похожий на signal в Temporal, позволяющий активным обработчикам подтягивать информацию из других обработчиков.

const chessService = restate.router({
  game: async (ctx: restate.RpcContext, gameID: string) => {
    let boardState = { ... }

    // ctx.stream не существует!
    for await (const move of ctx.stream("moves").receive()) {
      if (verifyMove(boardState, move)) {
        boardState = applyMove(boardState, move)
      }
    }
  },
  move: async (ctx: restate.RpcContext, move: Move) => {
    // ctx.stream не существует!
    ctx.stream("moves").send(move)
  },
});
const chessApi: restate.ServiceApi<typeof chessService> = { path: "chess" };

Напомню, что этот обработчик только внешне кажется долгим – среда выполнения может приостанавливать его после любого совершённого хода, что позволяет ему работать в бессерверных системах. Как же в действительности сохраняются предыдущие ходы? Когда обработчик при получении нового хода игрока возобновляет работу, он воспроизводит все полученные ранее ходы, перепроверяет их и выстраивает состояние доски до момента, когда оказывается готов обработать текущий ход. И эта схема не очень удачна. Если предположить, что verifyMove имеет сложность O(1), то мы без необходимости выполняем операцию O(N) для каждого хода. Дело в том, что у среды выполнения нет информации о том, какое состояние обработчика было на момент приостановки. Она лишь знает, как воссоздать его с нуля. И это типичная проблема в подходах с порождением событий. По сути, мы используем бесконечно растущий журнал действий обработчика в качестве базы данных. Грамотное решение, но оно бьёт по производительности, делая механизм приостановки более затратным.

Именно поэтому Restate раскрывает базу данных пар ключ-значение, чтобы вызовы по конкретному ключу могли взаимодействовать с будущими вызовами этого ключа явно посредством состояния:

const chessService = restate.keyedRouter({
  move: async (ctx: restate.RpcContext, gameID: string, move: Move) => {
    const boardState = await ctx.get<BoardState>("boardState")
    if !(verifyMove(boardState, move)) {
      throw new TerminalError(`Invalid move ${move}`)
    }
    ctx.set("boardState", applyMove(boardState, move))
  },
});
const chessApi: restate.ServiceApi<typeof chessService> = { path: "chess" };

Поэтому вместо потоков, которые отправляют сообщения неактивным обработчикам, аккумулирующим состояние в журнале, мы можем просто отправлять RPC сервисам, аккумулирующим состояние в Restate. Это значительно всё упрощает. Для сервисов RPC является стандартным механизмом взаимодействия, а явное состояние – стандартным способом запоминания происходящего. В общем смысле это тот же принцип, что и в типичных сервисах, которые хранят состояние в постоянном хранилище.

Но, подождите-ка… Почему мы не можем просто использовать для этого любое хранилище пар ключ-значение? Многие пишут рабочие потоки, взаимодействующие с Postgres, например. Дело в том, что тогда мы внесём проблему двойной записи. Вам нужно обеспечить синхронизацию хранилища и процесса выполнения, даже в случае сбоев. Одно только устойчивое выполнение вас не спасёт. Вам потребуется реализовать некую семантику транзакций и блокировки хранилища, а также правильно её интегрировать. В случае с Restate вы получаете хранилище, которое будет гарантированно обновляться в соответствии с прогрессом выполнения обработчика, сохраняя согласованность даже в условиях сбоев, состояний гонки и разделения сети.

▍ Заключение


Мы пока не уверены, все ли случаи длительно выполняющихся обработчиков смогут покрыть эти техники, но считаем, что многие. И если мы сможем создавать преимущественно недолгие обработчики, то версионирование можно будет реализовывать путём иммутабельных деплоев, когда старые версии сохраняются до тех пор, пока не завершатся их вызовы, которые обычно будут составлять всего несколько минут.

Но здесь мы не категоричны. Порой длительный обработчик реально оказывается удобным решением проблемы, и связанные с этим вопросы обновления иногда можно уладить с помощью потока управления с учётом версий1 либо путём перезапуска обработчиков в случае изменений.

▍ Примечание


1. Есть возможность обновлять активные рабочие потоки, добавляя в код ветки новых версий, но мы считаем, что такой подход стоит использовать только в крайних случаях, поскольку это сильно усложняет последующее обслуживание кода.

Telegram-канал со скидками, розыгрышами призов и новостями IT 💻