https://habr.com/ru/post/502380/- Open source
- Python
- Программирование
- Параллельное программирование
Привет, Хабр! Я хочу рассказать, как я решал проблему эффективного конкурентного исполнения asyncio задач в Celery.
Введение
Celery — большой проект со сложной историей и тяжким бременем обратной совместимости. Ключевые архитектурные решения принимались задолго до появления asyncio в python. Поэтому тем более интересно, что запустить как-нибудь asyncio-задачу можно в celery из коробки.
Зануда mode on
Формально после
#839. Субъективно для меня переименование пакета не меняет архитектуру приложения. Зануда mode off.
Запускаем asyncio задачи в ванильном celery
Запустить asyncio задачу можно из коробки:
import asyncio
from .celeryapp import celeryapp
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run(coro)
Очевидные плюсы:
- Работает же!
- Просто
- Нет дополнительных внешних зависимостей
Что, собственно, не так?
- Event Loop создаётся внутри каждого воркера
- Переключения контекста между исполняющимися корутинами не происходит
- В один момент времени воркер исполняет не более одной корутины
- Общие ресурсы не шарятся между задачами
- Бойлерплейт
То есть asyncio в данном случае используется ради asyncio, а преимуществ никаких
Попробуем хитрее?
Я боролся за производительность:
import asyncio
import threading
from .celeryapp import celeryapp
celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
target=celeryapp.loop.run_forever,
daemon=True,
)
celeryapp.loop_runner.start()
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run_coroutine_threadsafe(
coro=coro,
loop=celeryapp.loop,
)
Бинго! Плюсы:
- Всё ещё работает
- Даже более-менее эффективно
- Даже ресурсы шарятся между корутинами в пределах воркера
- Всё ещё нет дополнительных внешних зависимостей
Бинго? Проблемы не заставили себя ждать:
- Celery не знает ничего про запущенную корутину
- Вы теряете контроль над исполнением таски
- Вы теряете контроль над исключениями
- Бойлерплейт
Идти с таким чудесным инженерным решением в прод у меня как-то рука не поднялась
Постановка задачи
- Должно работать
- Корутины должны исполняться конкурентно
- Ресурсы должны шариться между множеством исполняемых тасок
- Никакого бойлерплейта
- Простой предсказуемый API
То есть, мои ожидания:
import asyncio
from .celeryapp import celeryapp
@celeryapp.task
async def async_task():
await asyncio.sleep(42)
Итоги
Свои идеи я реализовал в библиотеке
celery-pool-asyncio. Эта библиотека используется используется нашей командой в текущем проекте, и мы уже выкатились в прод.
Коротко о возможностях
Помимо непосредственно исполнения asyncio задач, celery-pool-asyncio также решает проблемы:
- Шедулинг асинхронных задач
- Поддержка корутин в сигналах celery
Для того, чтобы заставить celery-pool-asyncio работать, я использовал
monkey patching. Для каждого применяемого в рантайме патча предусмотрена возможность отключения.
Обо всём этом подробнее можно прочитать в документации
Планы
По-хорошему, нужно интегрировать пул в celery. С другой стороны, разработчики celery во всю прототипируют celery 5.0, который будет асинхронным. Стоит ли игра свеч?
Примеры
Для демонстрации возможностей моих библиотек
celery-pool-asyncio и
celery-decorator-taskcls (
статья) был реализован
тестовый проект.
Прочее
Я пытался то же самое рассказать на митапе