https://habr.com/ru/post/525912/Всем привет. Я начал работать над
библиотекой для выдергивания данных из разных json api. Также она может использоваться для тестирования api.
Апишки описываются в виде классов, например
class Categories(JsonEndpoint):
url = "http://127.0.0.1:8888/categories"
params = {"page": range(100), "language": "en"}
headers = {"User-Agent": get_user_agent}
results_key = "*.slug"
categories = Categories()
class Posts(JsonEndpoint):
url = "http://127.0.0.1:8888/categories/{category}/posts"
params = {"page": range(100), "language": "en"}
url_params = {"category": categories.iter_results()}
results_key = "posts"
async def comments(self, post):
comments = Comments(
self.session,
url_params={"category": post.url.params["category"], "id": post["id"]},
)
return [comment async for comment in comments]
posts = Posts()
В params и url_params могут быть функции(как здесь get_user_agent — возвращает случайный useragent), range, итераторы, awaitable и асинхронные итераторы(таким образом можно увязать их между собой).
В параметрах headers и cookies тоже могут быть функции и awaitable.
Апи категорий в примере выше возвращает массив объектов, у которых есть slug, итератор будет возвроащать именно их. Подсунув этот итератор в url_params постов, итератор пройдется рекурсивно по всем категориям и по всем страницам в каждой. Он прервется когда наткнется на 404 или какую-то другую ошибку и перейдет к следующей категории.
А репозитории есть пример aiohttp сервера для этих классов чтобы всё можно было протестировать.
Помимо get параметров можно передавать их как data или json и задать другой method.
results_key разбивается по точке и будет пытаться выдергивать ключи из результатов. Например «comments.*.text» вернет текст каждого комментария из массива внутри comments.
Результаты оборачиваются во wrapper у которого есть свойства url и params. url это производное строки, у которой тоже есть params. Таким образом можно узнать какие параметры использовались для получения данного результата Это демонстрируется в методе comments.
Также там есть базовый класс Sink для обработки результатов. Например, складывания их в mq или базу данных. Он работает в отдельных тасках и получает данные через asyncio.Queue.
class LoggingSink(Sink):
def transform(self, obj):
return repr(obj)
async def init(self):
from loguru import logger
self.logger = logger
async def process(self, obj):
self.logger.info(obj)
return True
sink = LoggingSink(num_tasks=1)
Пример простейшего Sink. Метод transform позволяет провести какие-то манипуляции с объектом и вернуть None, если он нам не подходит. т.е. в тем также можно сделать валидацию.
Sink это асинхронный contextmanager, который при выходе по-идее будет ждать пока все объекты в очереди будут обработаны, потом отменит свои таски.
Ну и, наконец, для связки этого всего вместе я сделал класс Worker. Он принимает один endpoint и несколько sink`ов. Например,
worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()
run запустит asyncio.run_until_complete для pipeline`а worker`а. У него также есть метод transform.
Ещё есть класс WorkerGroup который позволяет создать сразу несколько воркеров и сделать asyncio.gather для них.
В коде есть пример сервера, который генерит данные через faker и обработчиков для его endpoint`ов. Думаю это нагляднее всего.
Всё это на ранней стадии развития и я пока что часто менял api. Но сейчас вроде пришел к тому как это должно выглядеть. Буду раз merge request`ам и комментариям к моему коду.