Корутины C++ для чайников: пишем асинхронный веб-клиент
- понедельник, 11 марта 2024 г. в 00:00:21
Написать этот материал меня побудило... отсутствие хороших статей по корутинам в C++ в русскоязычном интернете, как бы странно это не звучало. Ну серьезно, C++20 существует уже несколько лет как, но до сих пор почти все статьи про корутины, что встречаются в рунете, относятся к одному из двух типов. Или обзор начинается с самых глубин и мелочей, пересказывая cppreference, а потом автор выдыхается и все сводится к «ну а дальше все понятно, возьмите и примените это в своем коде», что напоминает известную картинку с совой:
Либо иногда в статьях рассматривается применение корутин на примере генераторов, и этим все и ограничивается. Но, давайте будем честны, генераторы — это замечательно, но за все время моей многолетней карьеры разработчика я, вероятно, делал что‑то подобное генераторам разве что разок, в то время как асинхронный ввод‑вывод приходится использовать почти в каждом проекте. И поэтому меня гораздо больше интересует реализация асинхронного ввода‑вывода с использованием корутин, а не генераторы. Поэтому пришлось разбираться во всем самому.
(Если вы все это и так хорошо знаете, можете пропустить и проскроллить ниже)
Когда говорят об «асинхронности», обычно имеют в виду асинхронный ввод‑вывод. Это могут быть операции записи или чтения из файловой системы, обмена данными с внешними устройствами (например, по последовательному порту), или передача и получение информации по сети, такая как запросы от клиентов, обращения к микросервисам или серверам баз данных. В чем разница между синхронным и асинхронным вводом‑выводом? В синхронном подходе, когда мы вызываем, например, метод read() для получения данных из сетевого сокета, выполнение программы (потока) блокируется до тех пор, пока данные не будут получены или не произойдет таймаут. А в случае асинхронного подхода, это больше похоже на создание задания с просьбой сообщить, когда оно будет выполнено: асинхронные вызовы не блокируют поток, позволяя ему заниматься другими делами.
Почему это важно? Представим, вы пишете очень простой сетевой сервис: к вам подключается клиент, отправляет вам запрос, вы этот запрос перенаправляете к серверу базы данных, получаете от него ответ, и отдаете его клиенту. В синхронном стиле алгоритм может выглядеть как‑то так (псевдокод):
while (!terminated)
{
var socket = server.accept(); // принимаем входящее подключение
var request = socket.read(...));
var result = db_сlient.do_query(transform_request(request));
socket.write(transform_result(result));
socket.close();
}
Казалось бы, все хорошо, код простой, очень логичный и хорошо читаемый. И он вполне даже будет работать. Но что произойдет, если у вас будет больше клиентов и запросы будут приходить чаще и чаще? Или если в один и тот же момент к вам подключатся сразу два клиента? В таком случае клиентам придется «ждать в очереди» пока будут обработаны запросы предыдущих клиентов. Более того, может оказаться так, что какому‑нибудь клиенту, с которым вы могли бы разобраться гораздо быстрее даже не ходя в базу данных (например, достав информацию из кэша), придется тоже ждать процессинга предыдущих запросов от других клиентов.
Чтобы улучшить ситуацию, можно каждое входящее подключение обрабатывать в отдельном потоке. Это поможет справиться с проблемой на некоторое время, позволяя обрабатывать десятки и, возможно, даже сотни одновременных подключений. Однако, когда количество запросов становится очень большим — тысячи и десятки тысяч — ваш сервер рано или поздно все равно захлебнется. Поскольку количество процессорных ядер ограничено, операционная система должна постоянно переключать контекст между этими сотнями и тысячами потоков, что является дорогой и медленной операцией. Кроме того, для каждого потока потребуется выделить память под его стек. И вы будете иметь тысячи потоков, которые в большей части времени еще будут простаивать, ожидая обработки.
И вот тут на помощь приходит асинхронщина. Теперь нам не надо делать в каждом потоке read() и ждать. Вместо этого мы можем иметь один (ну или несколько, по количеству процессорных ядер) поток, в котором будем в цикле вызывать функцию API операционной системы и говорить ей: «Вот тебе список файловых дескрипторов, например, сокетов. Как только в любом из них что‑нибудь произойдет, например, будут приняты какие‑то данные, сразу же скажи мне». Такими функциями могут быть select() или poll(), или более эффективные из семейства epoll, или самый новый механизм под названием io_uring (это про Linux, в других системах есть аналогичные механизмы). И потом, в зависимости от того, в каком дескрипторе и что именно произошло, мы быстро обработаем это событие и будем ждать следующего. Таким образом, всего одним или несколькими потоками мы сможем эффективно обрабатывать тысячи и десятки тысяч соединений и событий.
Но тут появляется проблема — структурирование и читаемость кода. Если при синхронном подходе все предельно просто и понятно — взяли данные отсюда, положили сюда, и так далее, то в случае с асинхронщиной все становится гораздо сложнее и запутаннее — вы отправили запрос с одном месте, а обрабатываете ответ в совершенно другом.
Чтобы немного исправить ситуацию, можно использовать механизм коллбэков. Для этого можно добавить некоторые обертки и абстракции над event loop. Например, при вызове функции read_async() можно передать в неё коллбэк в качестве одного из аргументов. В С это может быть просто указатель на функцию, а в C++ — std::function, что позволяет использовать std::bind и лямбды. Этот уже лучше, но все равно сложнее и дает менее читаемый код, чем при синхронном подходе. Особенно если логика и алгоритмы усложняются, возникает риск попасть в так называемый «callback hell», когда один коллбэк вызывает второй, а тот третий, и так далее, в результате чего код становится сложным и запутанным из‑за размазывания логики по множеству мелких функций или кучи вложенных лямбд. Кроме того, существует риск забыть вызвать что‑то в каком‑то случае из‑за невнимательности.
И вот тут нам на помощь приходят корутины. По сути дела, они сочетают сильные стороны обоих подходов - мы пишем код, который выглядит как синхронный, но по факту он работает асинхронно. Посмотрим еще раз на искусственный пример выше (псевдокод):
var socket = await server.accept(); // принимаем входящее подключение
var request = await socket.read(...));
var result = await db_сient.do_query(transform_request(request));
await socket.write(transform_result(result));
socket.close();
Компилятор сделает особую магию. Грубо говоря, из этой одной функции он сделает сразу несколько. Вызвав server.accept() из первой строки, поток выполнения будет освобожден, а все что дальше — станет коллбэком (кодом, который выполнится, когда у нас будет на руках результат операции accept). Когда вызовется коллбэк, будет выполнена вторая инструкция read(), поток выполнения снова будет освобожден, а последующие инструкции будут коллбэком. И так далее.
На самом деле компилятор не будет генерировать много разных функций, а сгенерирует одну, но с машиной состояний (конечным автоматом) внутри, и соответственно, при вызовах нашей функции точка выполнения будет перескакивать в разные места кода, в зависимости от того, на каком этапе мы находимся. Это называется Duff's Device, в честь Тома Даффа, который запилил этот механизм в начале 1980-х годов для оптимизации копирования массивов данных разной длины на слабом железе. Те оптимизации давно уже не нужны, но почти тот же механизм используется для создания конечных автоматов при компиляции корутин.
Остаётся один важный вопрос: как обстоит дело со стеком? В нашей функции мы определяем локальные переменные, передаём ей аргументы при её вызове, а при выходе из функции стек‑фрейм может (и будет) быть перезаписан вызовом другой функции, что означает, что данные могут быть утеряны между вызовами. Что делать?
Саймон Тэтхем, автор легендарной утилиты PuTTy в своей статье про реализацию корутин на Си, и Адам Данкелс, автор сишной библиотеки protothreads (кстати, она очень крутая!), предлагают радикальное решение этой проблемы — отказаться от использования локальных переменных. Вместо этого можно использовать статические переменные, глобальные переменные или выделять память в куче, но использовать локальные переменные нельзя, ни‑ни, ведь нет никаких гарантий сохранности данных между вызовами.
Мы, естественно, так с ума сходить не будем, в современном мире есть два гораздо более удобных способа не потерять контекст при переходах между состояниями: stackful‑ и stackless‑корутины.
Идея stackful‑корутин достаточно проста. Стек представляет собой область памяти, адрес которой хранится в регистре процессора и инкрементируется/декрементируется при вызове функций или выходе из них. Технически, перед вызовом функции мы можем выделить блок памяти и изменить в регистре процессора указатель на стек, чтобы он указывал на новый блок. При выходе из функции мы восстанавливаем исходный указатель на стек, а при следующем запуске функции снова меняем указатель на тот наш блок памяти. Примером реализации stackful‑корутин является известная библиотека Boost.Coroutine2. Хотя такой подход не очень кросс‑платформенный из‑за различий в регистрах указателя на стек на разных архитектурах, требует низкоуровневых решений, и не очень удобен (корутины запускаются макросами), он имеет преимущество: для использования таких корутин не требуется поддержка компилятором, и можно писать код на корутинах даже с использованием древних версий C++ (например, C++11).
Stackless корутины, как подсказывает название, отличаются тем, что не используют стек. Вместо этого, компилятор анализирует функцию, и, очень упрощенно говоря, объединяет все локальные переменные в одну структуру данных (так называемый фрейм корутины), а затем перед вызовом функции выделяет блок памяти под эту структуру в куче. В дальнейшем все операции с локальными переменными выполняются в этом блоке памяти.
Впрочем, кажется что-то мы засиделись с теорией. Давайте уже переходить к практике.
Для лучше понимая разницы, чем отличается реализация с коллбэками и корутинами, и как передти от первого ко второму, мы сначала напишем веб-клиент на коллбэках. Веб-клиент очень простой - он будет уметь делать HTTP GET-запросы к заданному URL и возвращать HTTP-код плюс строковой буффер с данными, полученными от сервера. Для этого мы будем использовать популярную библиотеку CURL.
// скомпилировать пример можно так:
// clang++ -std=c++20 -stdlib=libc++ -lcurl -lstdc++ -Wall -Wextra -Wpedantic ./curl_async.cpp
// или
// g++ ./curl_async.cpp --std=c++20 -lcurl -Wall -Wextra -Wpedantic
#include <atomic>
#include <functional>
#include <iostream>
#include <string>
#include <thread>
#include <curl/curl.h>
class WebClient
{
public:
WebClient();
~WebClient();
struct Result
{
int code;
std::string data;
};
using CallbackFn = std::function<void(Result result)>;
void runLoop();
void stopLoop();
void performRequest(const std::string& url, Cb cb);
private:
struct Request
{
CallbackFn callback;
std::string buffer;
};
static size_t writeToBuffer(char* ptr, size_t, size_t nmemb, void* tab)
{
auto r = reinterpret_cast<Request*>(tab);
r->buffer.append(ptr, nmemb);
return nmemb;
}
CURLM* m_multiHandle;
std::atomic_bool m_break{false};
};
WebClient::WebClient()
{
m_multiHandle = curl_multi_init();
}
WebClient::~WebClient()
{
curl_multi_cleanup(m_multiHandle);
}
void WebClient::performRequest(const std::string& url, CallbackFn cb)
{
Request* requestPtr = new Request{std::move(cb), {}};
CURL* handle = curl_easy_init();
curl_easy_setopt(handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &WebClient::writeToBuffer);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, requestPtr);
curl_easy_setopt(handle, CURLOPT_PRIVATE, requestPtr);
curl_multi_add_handle(m_multiHandle, handle);
}
void WebClient::stopLoop()
{
m_break = true;
curl_multi_wakeup(m_multiHandle);
}
void WebClient::runLoop()
{
int msgs_left;
int still_running = 1;
while (!m_break) {
curl_multi_perform(m_multiHandle, &still_running);
curl_multi_poll(m_multiHandle, nullptr, 0, 1000, nullptr);
CURLMsg* msg;
while (!m_break && (msg = curl_multi_info_read(m_multiHandle, &msgs_left)))
{
if (msg->msg == CURLMSG_DONE)
{
CURL* handle = msg->easy_handle;
int code;
Request* requestPtr;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code);
curl_easy_getinfo(handle, CURLINFO_PRIVATE, &requestPtr);
requestPtr->callback({code, std::move(requestPtr->buffer)});
curl_multi_remove_handle(m_multiHandle, handle);
curl_easy_cleanup(handle);
delete requestPtr;
}
}
}
}
В принципе, во все это можно глубоко не вникать, для дальнейшего понимания работы с корутинами достаточно видеть и быть готовым использовать его публичный интерфейс, а именно метод performRequest(), но для тех, кому интересно и кто не знаком с CURL, вкратце объясню, как это работает внутри.
Когда вызывается performRequest(), мы создаем структуру типа Request, в которой у нас хранится строковой буфер (std::string) для результата и колобок коллбэк (std::function), который надо будет вызвать когда запрос будет выполнен. Тут же мы функцией curl_easy_init() создаем хендл CURL* для нашего запроса и устанавливаем в нем разные параметры: URL куда надо сходить, PRIVATE (пользовательские данные, мы туда кладем указатель на наш Request*, чтобы потом найти, а что это вообще был за запрос и что делать с результатом), WRITEDATA (указатель на функцию, которая будет сохранять получаемые данные в наш буфер). После этого функцией curl_multi_add_handle() мы добавляем наш свежесозданный дескриптор нового запроса (типа CURL*) в механизм curl_multi, который уже и будет делать всю асинхронную магию.
Функция loop() — это непосредственно наш event loop, в котором эта асинхронная магия и происходит. Сначала вызывается curl_multi_perform(), в котором механизм curl multi делает свои дела: устанавливает подключения, отправляет данные, если есть что отправить, и т. д. Далее вызывается curl_multi_poll(), ожидающая событий в любых из наблюдаемых сокетов, тех, что мы до этого добавили функцией curl_multi_add_handle. Когда у нас произошли какие‑то события, мы поочередно выгребаем их функцией curl_multi_info_read() и смотрим, что же произошло — если видно, что запрос выполнился (CURLMSG_DONE), мы берем из CURLINFO_PRIVATE указатель на наш Request, вызываем коллбэк, после чего удаляем хендл из multi, освобождаем сам хендл и освобождаем Request. Их работа выполнена.
Использовать все это можно примерно так:
int main(void)
{
WebClient client;
std::thread worker(std::bind(&WebClient::runLoop, &client));
client.performRequest("https://postman-echo.com/get", [](WebClient::Result res)
{
std::cout << "Req0 Code: " << res.code << std::endl;
std::cout << "Req0 Data: '" << res.data << "'" << std::endl << std::endl;
});
client.performRequest("http://www.gstatic.com/generate_204", [&](WebClient::Result res1)
{
std::cout << "Req1 Code: " << res1.code << std::endl;
std::cout << "Req1 Data: '" << res1.data << "'" << std::endl << std::endl;
client.performRequest("http://httpbin.org/user-agent", [](WebClient::Result res2)
{
std::cout << "Req1-2 Code: " << res2.code << std::endl;
std::cout << "Req1-2 Data: '" << res2.data << "'" << std::endl << std::endl;
});
});
client.performRequest("http://httpbin.org/ip", [](WebClient::Result res)
{
std::cout << "Req2 Code: " << res.code << std::endl;
std::cout << "Req2 Data: '" << res.data << "'" << std::endl << std::endl;
});
std::cin.get();
client.stopLoop();
worker.join();
return 0;
}
Запустили event loop, после чего можем накидывать в него таски. Они могут выполняться как параллельно, так и последовательно - в примере выше Req1-2 выполнится только когда мы получим результат выполнения Req1. Event loop даже не обязательно запускать в отдельном потоке, можно сначала накидать тасок, а потом уже крутить цикл прямо там же.
Если механика работы с CURL не особо понятна, то не стоит переживать. Когда мы начнем переходить на корутины, мы не будем изменять этот код, а только добавим пару дополнительных свистелок, переиспользующих метод performRequest()
.
Если кто-то захочет использовать этот код в продакшене - имейте в виду, он специально упрощен и требует доработки. Например, там не используется RAII, там почти нет обработки ошибок, не задаются таймауты, если вы остановите event loop до выполнения всех запросов, то получите утечку памяти, а еще вы можете получить race condition, потому что API CURL не потокобезопасный (например, есть смысл не сразу вызывать curl_multi_add_handle, а добавлять хендлы во временный list, и вызывать для них curl_multi_add_handle уже в потоке event loop'а в его свободное время). Но в нашем случае это все не играет большой роли, поэтому код сделан максимально простым, пусть и не совсем корректным и эффективным.
А дальше переходим к самому интересному. Корутинам.
В отличие от многих других языков типа C# или Javascript, где рантайм берет на себя огромное количество головной боли программиста при работе с корутинами, в C++ все придется делать ручками. Более того, то, что мы имеем в C++20 - это вообще самые базовые и низкоуровневые примитивы для корутин, они скорее не для повседневного использования, а для разработчиков библиотек. Но мы же не ищем легких путей и готовы к трудностям, правда?
С точки зрения компилятора, корутиной является функция, в которой есть хотя бы один из трех операторов: co_yield, co_await или co_return. Да, нужно отметить, что корутиной может быть, например, лямбда, но не может быть main(), конструктор, деструктор, constexpr-функция, обработчик исключений, ну и там есть еще ряд ограничений. Теперь про операторы. co_yield - это для генераторов, к нашей теме не относится, про co_return мы поговорим позже, а вот co_await - это то, что для нас самое интересное. Вызов co_await делает буквально то, что нам нужно - приостанавливает выполнение функции, которое будет продолжено когда-то потом с точки останова. Взяв пример выше, с co_await он может выглядеть примерно так:
Result r1 = co_await client.performRequestAsync("https://postman-echo.com/get");
std::cout << "Req1 ready: " << r1.code << " - " << r1.data << std::endl;
Result r2 = co_await client.performRequestAsync("http://httpbin.org/user-agent");
std::cout << "Req2 ready: " << r2.code << " - " << r2.data << std::endl;
В итоге сначала сработает первая строка, далее поток будет освобожден, но когда мы в event loop'е получим результат выполнения запроса, будут выполнены 2 и 3 строки, после чего корутина опять заснет до получения результата, а проснувшись продолжит выполнение с 4-ой строки.
Оператор co_await применяется к тому, что называется Awaitable. Awaitable может быть все что угодно, имеющее как минимум три следующих метода: await_ready, await_suspend, await resume:
template <typename T>
struct Awaitable<T> {
bool await_ready() const noexcept {
// а нужно ли нам вообще засыпать, может все уже и так готово
// и мы можем продолжить сразу?
}
void await_suspend(std::coroutine_handle<> handle) noexcept
{
// здесь мы можем запустить какой-то процесс,
// по завершению которого нами будет вызван handle.resume()
}
T await_resume() const noexcept {
// здесь мы вернем вызывающей стороне результат операции,
// ну или void если не хотим ничего возвращать
}
};
await_ready()
вызывается в самом начале, когда происходит co_await. Если из нее вернулось false, то дальше будет вызван await_suspend. А если из нее вернулось true, то значит что все уже и так готово, suspend'ится не обяательно, и можно сразу переходить к await_resume в том же потоке. Чаще всего вы отсюда будете возвращать false, но есть случаи, когда true тоже имеет смысл, я потом приведу один пример.
await_suspend()
вызывается когда нам нужно "приостановить" выполнение корутины. Именно здесь мы можем запустить какой-то асинхронный процесс. В качестве аргумента в эту функцию передается coroutine_handle, и это для нас очень важно, потому что когда наш асинхронный процесс завершится, мы должны будем вызвать handle.resume()
чтобы возобновить выполнение корутины.
Простой пример, даже без I/O: мы делаем много-много легких маленьких операций, но в какой-то из многочисленных if...else ветвей должны сделать что-то долгое и ресурсоемкое. Чтобы не блокировать поток надолго, в await_suspend() мы можем запустить std::thread с нужной нам логикой, и в конце его выполнения сделать handle.resume(), в результате чего выполнение корутины продолжится в том же новом потоке.
Из await_suspend() может возвращаться: 1) void — после этого контроль возвратится к тому, кто нас вызвал, сама же корутина будет приостановлена 2) true/false — если true, то то же самое что и при void, если false, то корутина не будет приостановлена, и выполнение продолжится дальше 3) coroutine_handle какой‑то другой корутины, она будет возоблена.
await_resume()
вызовется после предыдущего шага (когда кто-нибудь запустит handle.resume()). Отсюда можно просто вернуть результат операции, или, если мы ничего не хотим возвращать, вернуть void.
Вы можете спросить "Так, подождите, handle.resume() не принимает никаких аргументов, как мы вернем результат операции из await_resume()?" Да просто - сохраним его прямо в нашем Awaitable :)
Ближе к делу, напишем Awaitable, который запустит запрос в нашем веб-клиенте:
struct RequestAwaitable {
RequestAwaitable(WebClient& client_, std::strinf url_) : client(c_), url(std::move(url_)) {};
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept
{
client.performRequest(std::move(url), [handle, this](Result res)
{
result = std::move(res);
handle.resume();
});
}
Result await_resume() const noexcept { return std::move(result); }
WebClient& client;
std::string url;
Result result;
};
RequestAwaitable WebClient::performRequestAsync(std::string url)
{
return RequestAwaitable(*this, std::move(url));
}
await_ready() у нас всегда возвращает false (мы еще даже не начали запрос, значит мы точно не готовы), в await_suspend() мы дергаем наш уже знакомый performRequest, по выполнению которого сохраняем результат в мембер структуры и запускаем await_resume(). В await_resume() мы просто отдаем то что получилось. И бонусом чуть ниже объявляем хелпер-функцию performRequestAsync
, которая сконструирует наш RequestAwaitable как надо, создавая Awaitable и кладя туда все что нужно для его использования (аргументы, ссылку на клиент, и т.д.).
На самом деле, если нет цели поддерживать API с коллбэками, то можно даже избавиться от performRequest()
c std::function, а делать то, что мы делали в нем, сразу в await_suspend()
, и сохранять в Request не коллбэк, а сразу coroutine_handle для продления. Будем считать такую переделку домашним заданием для любознательных :)
Из await_resume() можно кидаться исключениями. Например, если при выполнении запроса произошла ошибка, в await_resume вы это проверите и сделаете throw. В этом случае корутина после возобновления продолжит свое выполнение с вашим активным исключением (и где-то там вы его потом поймаете).
Еще можно результат выполнения операции можно хранить не как Result, а как std::future<Result>. При запуске await_suspend() или сразу в конструкторе мы создадим std::promise<Result>, сделаем promise.get_future(), и сохраним его. Когда операция выполнится, мы можем сделать promise.set_value(), а в await_resume(), соответственно, вернуть future.get(). Потому что кроме promise.set_value() можно будет еще делать promise.set_exception(), то есть таким образом можно будет сохранять возникшие где-то в процессе операции исключения целиком и кидать их еще раз при пробуждении корутины.
Но и еще что я точно знаю - вы можете кидаться исключения и из await_suspend(). В этом случае корутина сразу продолжит свое выполнение с вашим активным исключением.
Давайте представим, что мы немного переписали нашу корутину и сделали что-то такое:
auto req1 = client.performRequestAsync("https://postman-echo.com/get");
auto req2 = client.performRequestAsync("http://httpbin.org/user-agent");
co_await req1;
co_await req2;
Оно будет работать так же, как и предыдущая реализация - сначала запустится первый запрос, а по его завершении запустится уже второй.
Но... мы можем перенести нашу логику запуска запроса (client.performRequest(...)) из await_suspend() в... конструктор нашего Awaitable. По завершении мы сохраним результат в поле result, и чтобы можно было понять, есть ли уже результат или нет, добавим флаг std::atomic_bool, либо немного изменим result и будем хранить его как std::optional (к нему может понадобится мьютекс). На случай, если результата еще нет, в await_suspend мы сохраним coroutine_handle как член структуры, либо сохраним std::function который вызовет его .resume(), чтобы продолжить выполнение по окончанию процесса.
В результате наша асинхронная операция запустится еще в момент конструирования Awaitable, и к моменту, когда мы сделаем co_await, она уже может быть выполнена и у нас на руках уже будет результат - тогда мы можем вернуть await_ready() = true, что означает, что нет смысла приостанавливать корутину для ожидания, можно сразу же продолжать дальше.
И код, который приведен выше, будет работать уже по-новому - запросы начнут выполняться еще в момент performRequestAsync() параллельно, а потом в co_await мы просто дождемся, когда они все будут выполнены.
Итак, вроде все, мы теперь можем делать co_await performRequestAsync
и все будет работать? Ха, нет. Я немного слукавил, на самом деле для того, чтобы функция была корутиной, нужно, чтобы в ней был один из операторов co_yield/co_await/co_return, но важно и еще кое-что. Возвратным типом функции должно быть что-то, у чего объявлен тип promise_type, а у самого promise_type должен быть определенный набор методов. Как предлагает это делать нам cppreference:
struct promise;
struct coroutine : std::coroutine_handle<promise>
{
using promise_type = ::promise;
};
struct promise
{
coroutine get_return_object() { return {coroutine::from_promise(*this)}; }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
Давайте разбираться, что же это за промис. Сразу надо сказать, что этот промис не имеет никакого отношения к уже знакомому многим std::promise, это совершенно разные типы не имеющие ничего общего. Теперь по методам:
initial_suspend()
- можно вернуть std::suspend_never или std::suspend_always. Определяет, будет ли корутина в спящем состоянии с самого начала (suspend_always), или же после создания она выполнится до первой точки останова (co_await) и заснет только потом. В своих юзкейсах мне везде нужен был std::suspend_never, потому что перед тем как заснуть нужно что-то запланировать, а зачем может нужен suspend_always в реальном мире я пока еще до конца не познал, возможно для генераторов или lazy evaluations.
get_return_object()
- определяет, что именно вернется вызывающей стороне в момент первой приостановки корутины (сразу же после ее создания если initial_suspend = true, или же в момент первого co_await/co_return/co_yield если initial_suspend = false). Часто не заморачиваются и возвращают себя самих, но можно вернуть и что-то другое.
final_suspend()
- тоже можно вернуть std::suspend_never или std::suspend_always. Определяет, будет ли корутина приостановлена еще раз, когда ее выполнение закончено (дошло до конца функции или был вызван co_return). В случае с std::suspend_never она нам больше не нужна и мы больше ее не используем, это самый простой и часто используемый случай, а при std::suspend_always она приостанавливается еще раз, давая нам возможность поделать еще какую-нибудь черную магию, но будьте аккуратны - в ряде случаев при suspend_always не выполняется автоматическое освобождение ресурсов, и есть риск получить утечку если забыть об этом (я в конце дам ссылку на статью где рассказывается при такие юзкейсы).
return_void()
- сработает при достижении конца функции, или если будет вызван co_return;
без возвращаемого значения. Если же мы захотим делать co_return something;
, то есть вернуть что-то из корутины, то должен быть объявлен метод void return_value (T&& value)
, в котором используется вы сохранените возвращаемое значение куда-нибудь в промис, чтобы потом передать его вызывающей стороне.
unhandled_exception()
- определяет, что мы будем делать, если в корутине произойдет необработанное исключение. Тут мы можем сделать std::abort, как это обычно происходит при необработанных исключениях, или что-то красиво залоггировать. unhandled_exception() вызывается внутри неявного catch-блока, так что вам ничего не запрещает, например, поймать это исключение с std::current_exception(), сохранить его куда-нибудь в exception_ptr, а потом снова бросить где надо.
Впрочем, хватит теории. Давайте теперь c учетом вышесказанного напишем тип (назовем его Task), который должна "вернуть" наша корутина. Все максимально просто: результатом работы корутины будет void, не засыпаем после создания, и не засыпаем после завершения:
struct promise;
struct Task : std::coroutine_handle<promise>
{
using promise_type = ::promise;
};
struct promise
{
Task get_return_object() { return Task{}; }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
Просто до безобразия. Готово, теперь мы можем запускать нашу корутину. main() не может быть корутиной, поэтому добавим еще одну промежуточную функцию:
Task doSomething(WebClient& client)
{
auto r1 = co_await client.performRequestAsync("https://postman-echo.com/get");
std::cout << "Req1 ready: " << r1.code << " - " << r1.data << std::endl;
auto r2 = co_await client.performRequestAsync("http://httpbin.org/user-agent");
std::cout << "Req2 ready: " << r2.code << " - " << r2.data << std::endl;
}
int main(void)
{
WebClient client;
std::thread worker(std::bind(&WebClient::runLoop, &client));
doSomething(client);
std::cin.get();
client.stopLoop();
worker.join();
};
Обратите внимание, наш тип Task получился вполне универсальный - в нем нет никакой специфики касательно того, что мы делаем. Поэтому ни что не мешает использовать его во многих местах приложения, как этакий библиотечный кирпичик.
Компилируем, запускаем... и все работает! Теперь мы можем создавать такие корутины и дергать co_await() из них где угодно и как угодно.
Как говорил в моей юности мой батя, "Сынок, занимаясь удовольствиями, не забывай предохраняться". Код на корутинах визуально похож на обычный синхронный код, но нам важно не забывать, что по-факту он все-таки асинхронный, и поэтому можно наткнуться на много подводных камней из мира асинхронности и многопоточности. Например, самое простое - аргументы функций. Если мы получили их по значению, то они попадут во фрейм корутины, и мы сможем продолжать работать с ними после ее возобновления. А вот если что-то было передано по ссылке, если это временный объект, то скорее всего после возобновления корутины он уже не будет существовать, вы получите dangling reference. Решение - передавать аргументы по значению, либо сохранять из значения в локальные переменные в самом начале (если у вас initial_suspend не suspend_always, в этом случае не поможет). Еще важный момент - итераторы. Если вы делаете, например, for_each для какого-нибудь контейнера с co_await, то между приостановкой и возобновлением корутины кто-нибудь может успеть сделать что-то с этим контейнером, что инвалидирует итераторы, и вы получите сегфолт со спецэффектами. Подробнее об этом можно прочитать в статье "Недостатки корутин C++".
В принципе, Awaitable'ов и этого простого Task'а уже достаточно, чтобы делать очень и очень многое и начать использовать корутины в своих проектах.
Но внимательный читатель вероятно задастся вопросом: окей, а как я могу запустить корутину из корутины? Например, можно ли внутри одной корутины сделать co_await в другую корутину? Как оно будет работать, ведь после co_await вторая корутина возобновится только до ее конца, а как мы можем возобность выполнение первой с этого места?
И вот тут мы подошли вплотную к кроличьей норе. То, что описано выше, и то, что многим хочется сделать, называется nested coroutines. Проблема в том, что про них тоже почти нет нормальных статей с нормальными примерами, максимум - куски реализации разной кривости и работоспособности из многочисленных вопросов на stackoverflow. Но мне в итоге посчастливилось найти отличную статью на эту тему, где автор подробно разобрал этот вопрос и привел рабочие примеры. Вот она: Yet Another C++ Coroutine Tutorial (если вдруг с ней что-то случится, копия осталась в веб-архиве). Там еще раз разобрана основная теория и практика по корутинам, а то, что нас интересует, находится в параграфе "Awaitable Coroutines". Пересказ/перевод описанного там тянет на отдельную статью, там используется два awaitable'а, один из которых возвращает coroutine_handle внешней корутины из await_suspend, а другой запускает ее сохраненный .resume() в своем await_suspend(), в промисе в final_suspend() возвращается этот второй awaitable, а еще из-за того что используется final_suspend, там приходится вручную кое что подчищать. Звучит сложно, но в итоге все работает как надо, можно вызывать одну корутину из другой, и после возобновления и завершения первой, возобновится и вторая. Серьезно, почитайте, там очень интересно, но для нетерпеливых я приведу код оттуда:
template <typename Result = void>
class [[nodiscard]] Task {
public:
struct FinalAwaiter {
bool await_ready() const noexcept { return false; }
template <typename P>
auto await_suspend(std::coroutine_handle<P> handle) noexcept
{
return handle.promise().continuation;
}
void await_resume() const noexcept { }
};
struct Promise {
std::coroutine_handle<> continuation;
Result result;
Task get_return_object()
{
return Task { std::coroutine_handle<Promise>::from_promise(*this) };
}
void unhandled_exception() noexcept { }
void return_value(Result&& res) noexcept { result = std::move(res); }
std::suspend_always initial_suspend() noexcept { return {}; }
FinalAwaiter final_suspend() noexcept { return {}; }
};
using promise_type = Promise;
Task() = default;
~Task()
{
if (handle_) {
handle_.destroy();
}
}
struct Awaiter {
std::coroutine_handle<Promise> handle;
bool await_ready() const noexcept { return !handle || handle.done(); }
auto await_suspend(std::coroutine_handle<> calling) noexcept
{
handle.promise().continuation = calling;
return handle;
}
template <typename T = Result>
requires(std::is_same_v<T, void>)
void await_resume() noexcept { }
template <typename T = Result>
requires(!std::is_same_v<T, void>)
T await_resume() noexcept { return std::move(handle.promise().result); }
};
auto operator co_await() noexcept { return Awaiter { handle_ }; }
private:
explicit Task(std::coroutine_handle<Promise> handle)
: handle_(handle)
{
}
std::coroutine_handle<Promise> handle_;
};
template <>
struct Task<void>::Promise {
std::coroutine_handle<> continuation;
Task get_return_object()
{
return Task { std::coroutine_handle<Promise>::from_promise(*this) };
}
void unhandled_exception() noexcept { }
void return_void() noexcept { }
std::suspend_always initial_suspend() noexcept { return {}; }
FinalAwaiter final_suspend() noexcept { return {}; }
};
Причем благодаря шаблонности и отдельной специализации шаблона для void, такой Task может и возвращать значения из корутин (цепочки вложенных корутин), а может не возвращать ничего, как вам угодно.
Я подозреваю, что у многих читателей после всего этого возникнет вполне предсказуемая реакция: "WTF? Серьезно? Почему ТАК сложно? Зачем столько танцев с бубном чтобы запустить простейшие корутины?!". Фрустрация вполне понятна. Как я уже говорил выше, то, что успело войти в стандарт C++20 - это только самые низкоуровневые примитивы для корутин. Они предназначены не сколько для повседневного пользования, сколько для разработчиков библиотек. Соответственно, чтобы удобно их использовать, вы тоже должны почувствовать себя разработчиком библиотеки - создать разные Awaitable в тех местах, где происходит переход из мира корутин в мир потоков или низкоуровневых вызовов, объявить классы типа описанного выше простого Task или сложного Task c поддержкой вложенности (как я уже говорил, эти классы довольно универсальны, вам не придется плодить десятки их для разных применений, хватит парочки на все случаи жизни), и после этого уже без особо головной боли писать в остальных частях вашего приложения код с корутинами с удовольствием. Либо можно использовать готовые библиотеки, например, cppcoro - на сегодняшний день это самая мощная и популярная библиотека для написания корутинного кода на C++. В ней вы найдете не только кирпичики и примитивы для корутин, типа описанных выше Task, но и разные интересные обертки, например, async_mutex, а также многочисленные Awaitable'ы для работы с файлами, сокетами, и т.д.