https://habr.com/ru/company/ostrovok/blog/482114/- Блог компании Ostrovok.ru
- Разработка веб-сайтов
- Python
- Программирование
- Django
Всем привет!
Я занимаюсь разработкой и поддержкой сервиса уведомлений в
Ostrovok.ru. Сервис написан на Python3 и Django. Помимо транзакционных писем, пушей и сообщений, сервис также берёт на себя задачи по массовым рассылкам коммерческих предложений (не спам! trust me, отписки у нас работают лучше подписок) пользователям, давшим на это согласие. Со временем база активных получателей разрослась до более миллиона адресов, к чему почтовый сервис не был готов. Я хочу рассказать о том, как новые возможности Python позволили ускорить массовые рассылки и сэкономить ресурсы и с какими проблемами нам пришлось столкнуться при работе с ними.
Исходная реализация
Изначально массовые рассылки были реализованы самым простым способом: на каждого получателя в очередь помещалась задача, которую забирал один из 60 массовых воркеров (особенность наших очередей заключается в том, что каждый воркер работает в отдельном процессе), подготавливал для нее контекст, рендерил шаблон, отправлял HTTP запрос в Mailgun для отправки письма и создавал в базе запись о том, что письмо отправлено. Вся рассылка занимала до 12 часов, отправляя около 0.3 писем в секунду с каждого воркера и блокируя рассылки маленьких кампаний.
Асинхронное решение
Быстрое профилирование показало, что большую часть времени воркеры тратят на установку соединений с Mailgun'ом, поэтому мы стали группировать задачи в чанки, по чанку на каждый воркер. Воркеры стали использовать одно соединение с Mailgun'ом, что позволило сократить время рассылок до 9 часов, отправляя каждым воркером в среднем 0,5 писем в секунду. Последующее профилирование снова показало, что работа с сетью по-прежнему занимает большую часть времени, что и подтолкнуло нас к идее использовать asyncio.
Перед тем как поместить всю обработку в asyncio цикл, нам нужно было продумать решение ряда проблем:
- Django ORM пока ещё не умеет работать с asyncio, однако во время выполнения запросов освобождает GIL. Это значит, что запросы к базе могут выполняться в отдельном потоке и не блокировать работу основного цикла.
- Актуальные версии aiohttp требуют Python версии 3.6 и выше, что в момент реализации потребовало обновить докер образ. Эксперименты на более старых версиях aiohttp и Python 3.5 показали, что скорость отправки на этих версиях гораздо ниже, чем на новых, и сопоставима с последовательной отправкой.
- Хранение большого количества asyncio корутин быстро ведёт к расходованию всей памяти. Это значит, что нельзя заготовить заранее все корутины для писем и вызвать цикл для их обработки, необходимо подготавливать данные по мере отправки уже сформированных писем.
Учитывая все особенности, создадим внутри каждого из воркеров свой asyncio цикл с подобием ThreadPool паттерна, состоящего из:
- Одного или более производителей (producer), работающих с базой данных через Django ORM в отдельном потоке через asyncio.ThreadPoolExecutor. Производитель старается агрегировать запросы получения данных в маленькие батчи, рендерит шаблоны для полученных данных через Jinja2 и складывает данные для отправок в очередь задач.
def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]:
"""Формируем данные для отправки писем, здесь происходит работа с Django ORM и рендером шаблонов."""
return [{'id': id} for id in ids]
async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None:
"""
Группируем получателей в подчанки и формируем для них данные для отправки, которые помещаем в очередь.
Формирование данных требует работы с базой, поэтому выполняем его в ThreadPoolExecutor.
"""
loop = asyncio.get_event_loop()
total = len(ids)
for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE):
subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)]
send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids)
for task in send_tasks:
await task_queue.put(task)
- Нескольких сотен отправщиков писем – asyncio корутины, которые в бесконечном цикле читают данные из очереди задач, отправляют сетевые запросы для каждой из них и складывают результат (ответ, или исключение) в очередь отчётов.
async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]:
"""Отправляем запрос во внешний сервис."""
async with session.post(REQUEST_URL, data=data) as response:
if response.status_code != 200:
raise Exception
return data
async def mail_campaign_sender(
task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession
) -> None:
"""
Забираем из очереди данные и отправляем сетевые запросы.
Нужно не забывать вызывать task_done, чтобы вызывающий код понял, когда завершится отправка.
"""
while True:
try:
task_data = await task_queue.get()
result = await send_mail(task_data, session)
await result_queue.put(result)
except asyncio.CancelledError:
# Корректно обрабатываем остановку корутины
raise
except Exception as exception:
# Обрабатываем ошибки отправки писем
await result_queue.put(exception)
finally:
task_queue.task_done()
- Одного или нескольких воркеров, группирующих данные из очереди отчётов и помещающих в базу данных bulk запросом информацию о результате отправки письма.
def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None:
"""Обрабатываем результаты отправок: исключения и успех и помещаем их в базу данных"""
pass
async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None:
"""
Группируем отчёты в список и передаём на обработку в ThreadPoolExecutor,
чтобы положить в базу данных информацию об отправках.
"""
loop = asyncio.get_event_loop()
results_chunk = []
while True:
try:
results_chunk.append(await result_queue.get())
if len(results_chunk) >= REPORTER_BATCH_SIZE:
await loop.run_in_executor(None, process_campaign_results, results_chunk)
results_chunk.clear()
except asyncio.CancelledError:
await loop.run_in_executor(None, process_campaign_results, results_chunk)
results_chunk.clear()
raise
finally:
result_queue.task_done()
- Очереди задач, являющейся экземпляром asyncio.Queue, ограниченной по максимальному количеству элементов, чтобы производитель не переполнял её, расходуя всю память.
- Очереди отчётов, также являющуюся экземпляром asyncio.Queue с ограничением на максимальное количество элементов.
- Асинхронного метода, который создаёт очереди, воркеры, и завершает рассылку по их остановке.
async def send_mail_campaign(
recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None
) -> None:
"""
Создаёт очереди и запускает воркеры для обработки.
Дожидается завершения формирования получателей, после ждёт окончания отправки и сохранения отчётов.
"""
executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1)
loop = loop or asyncio.get_event_loop()
loop.set_default_executor(executor)
task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop)
result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop)
producers = [
asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT)
]
consumers = [
asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT)
]
reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue))
# Дожидаемся, когда все письма будут подготовлены
done, _ = await asyncio.wait(producers)
# Когда завершатся все отправки, останавливаем воркеров
await task_queue.join()
while consumers:
consumers.pop().cancel()
# Когда завершится сохранение отчётов, также останавливаем соответствующий воркер
await result_queue.join()
reporter.cancel()
- Синхронного кода, который создаёт цикл и начинает рассылку.
async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None:
"""
Закрываем сессию, когда вся обработка завершена.
Документация aiohttp рекомендует добавить задержку перед закрытием сессии.
"""
await asyncio.wait([future])
await asyncio.sleep(0.250)
await session.close()
def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None:
"""
Точка входа для начала рассылки.
Принимает идентификаторы получателей, создаёт asyncio цикл и запускает корутину отправки.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Session
connector = aiohttp.TCPConnector(limit_per_host=0, limit=0)
session = aiohttp.ClientSession(
connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60
)
send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop))
cleanup_future = asyncio.ensure_future(close_session(send_future, session))
loop.run_until_complete(asyncio.wait([send_future, cleanup_future]))
loop.close()
После реализации такого решения время отправки массовых рассылок сократилось до часа при таких же объёмах рассылок и 12 задействованных воркерах. То есть каждый воркер отправляет 20-25 писем в секунду, что в 50-80 раз производительнее исходного решения. Потребление памяти воркеров сохранилось на исходном уровне, загрузка процессора немного выросла, утилизация сети возросла многократно, что является ожидаемым эффектом. Также выросло количество соединений с базой данных, поскольку каждый из потоков воркеров-производителей и воркеров, сохраняющих отчёты, активно работают с базой. При этом освободившиеся воркеры могут рассылать небольшие рассылки в то время, как отправляется массовая кампания.
Несмотря на все преимущества, такая реализация имеет ряд сложностей, которые необходимо учитывать:
- Необходимо быть осторожными при обработке ошибок. Необработанное исключение может завершить выполнение воркера, из-за чего кампания «подвиснет».
- При завершении отправки необходимо не потерять отчёты по получателям, не заполнившие чанк до конца, и сохранить их в базу данных.
- Усложняется логика принудительной остановки возобновления кампаний, поскольку после остановки рассылающих воркеров, необходимо сопоставлять, каким получателям были отправлены письма, а каким – нет.
- Через какое-то время сотрудники поддержки Mailgun связались с нами и попросили снизить скорость отправки, потому что почтовые сервисы начинают временно отклонять письма, если частота их отправок превышает пороговое значение. Это легко сделать, уменьшив количество воркеров.
- Нельзя было бы использовать asyncio, если какой-то из этапов отправки писем выполнял бы требовательные к ресурсам процессора опрерации. Рендер шаблонов с использованием jinja2 оказался не очень ресурсоёмкой операцией и практически не оказывает влияния на скорость отправки.
- Использование asyncio для рассылок требует, чтобы обработчики очереди рассылок запускались отдельными процессами.
Надеюсь, наш опыт будет вам полезен! Если остались вопросы или появились идеи, пишите в комментариях!