javascript

Event-Driven подход в пет-проекте: автоматизация Telegram-канала на NiFi, Kafka и n8n

  • вторник, 3 марта 2026 г. в 00:00:03
https://habr.com/ru/articles/1005602/

Привет, Хабр! Хочу рассказать про один странный пет-проект, который немного вырвался из-под контроля.

Всем кто хочет подтянуть английский без напрягов сюда ))

Все описанные потоки можно попробовать в github Скачивайте, ставьте звездочки)

Началось всё обычно: есть VPS (2 ядра, 6 ГБ RAM, 40 GB NVMe), есть свободное время и желание сделать что-то полезное. А ещё есть давняя хотелка — попробовать Kafka в реальном бою. Ну и Telegram-канал для изучения английского как-то сам напросился: новости BBC, разбор лексики, викторины — вроде не сложно, но и не совсем hello world.

Спойлер: Kafka я попробовал, канал работает до сих пор, а архитектура получилась немного безумной — с двумя очередями и разделением ответственности, которое я буду защищать в комментариях. Под катом — почему n8n не справился бы в одиночку, как подружить NiFi с расписанием и зачем я заставляю DeepSeek всегда класть правильный ответ в индекс 0.

С чего всё началось

У меня был VPS с Ubuntu, Docker и желание автоматизировать всё, что автоматизируется. Исходные вводные:

  • Железо: 2 vCPU, 6 GB RAM, 40 GB NVMe — скромно, но для экспериментов хватает.

  • Задача: телеграм-канал с новостями на английском (только культура и искусство, политику не трогаем).

  • Регулярность: пост раз в 3 часа с 9 до 22.

  • Контент: сама новость, разбор лексики/грамматики и викторина для закрепления.

  • Условие: никого не дёргать, всё само.

Источник данных нашёлся быстро — BBC News API. Бесплатный прокси к BBC, отдаёт свежие статьи в json. Идеально.

Почему не один инструмент

Самый простой путь — взять n8n, настроить триггер по расписанию, прокинуть через HTTP-запросы к DeepSeek и Telegram. И это реально сработало бы. Но было два "но":

  1. Я хотел разобраться с Kafka. Не на абстрактных логах, а в живом проекте.

  2. Меня напрягала хрупкость расписания: если в 10 утра API ляжет, пайплайн встанет. А если, наоборот, отдаст 20 статей вместо 5, n8n их все прочитает и... что делать дальше? Городить БД для хранения очереди?

Так родилась идея разделить ответственность: пусть один сервис отвечает за сбор и фильтрацию, другой — за темп публикации, третий — за взаимодействие с LLM и телеграмом. Получилась небольшая event-driven архитектура, где:

  • NiFi забирает статьи раз в сутки, фильтрует и складирует во внутреннюю очередь.

  • Kafka работает как буфер между NiFi и n8n.

  • n8n просто реагирует на события и публикует посты.

Архитектурная схема (Archimate) выглядит так:

Архитектура проекта в общем виде
Архитектура проекта в общем виде

NiFi: слой сбора и внутренняя очередь

Общий вид потока в NiFi
Общий вид потока в NiFi

Я давно работаю с NiFi и неплохо знаю этот инструмент. Мой пайплайн в NiFi выглядит как граф из процессоров:

  • В 10 утра дёргаем BBC API.

  • Полученный массив разбиваем на отдельные статьи (SplitJson).

  • Фильтруем: только раздел "art", только с описанием, только с картинкой.

  • Обогащаем метаданными (UpdateAttribute) и кладём во внутреннюю очередь.

Дальше начинается самое интересное. Между фильтрацией и публикацией в Kafka стоит процессор ControlRate, который выпускает из очереди ровно одну статью раз в 3 часа. То есть внутри NiFi образуется вторая очередь (первая — это сам FlowFile Queue), и именно она задаёт темп публикации.

Почему я не опубликовал всё сразу в Kafka? Потому что тогда темп пришлось бы контролировать на стороне n8n, а это сложнее: нужно хранить offset'ы, думать о сбоях, равномерности. А так — NiFi делает ровно то, что умеет лучше всего: отдаёт данные порционно по расписанию.

Kafka: буфер, который всё стерпит

Kafka у меня тоже в Docker, в модной конфигурации KRaft (без Zookeeper). Один топик bbc-news-topic, одна партиция, retention 7 дней — на случай, если я уеду в отпуск, а канал должен работать.

Сюда NiFi публикует статьи, сюда же стучится n8n. Kafka в этой схеме — просто труба. Но труба надёжная: если n8n упадёт, сообщения никуда не денутся. Если NiFi временно недоступен, n8n всё равно дочитает то, что уже есть в топике.

Для мониторинга поставил AKHQ.io — удобно смотреть, что там в топиках происходит и не забилось ли всё.

Две очереди: почему это не оверхед, а архитектура

Теперь к главному, за что меня, возможно, будут ругать в комментариях. У меня две очереди:

  1. Внутренняя очередь NiFi (там лежат отфильтрованные статьи).

  2. Kafka (там лежат сообщения для n8n).

Можно было сделать проще: NiFi публикует всё утром в Kafka, а n8n забирает по таймеру раз в 3 часа. Но тогда возникают вопросы:

  • Если n8n прочитает все 10 статей сразу (потому что они есть в топике), а публиковать будет раз в 3 часа, то где хранить состояние? Какие статьи уже обработаны, а какие ещё нет? Придётся лепить базу данных или использовать state-менеджмент n8n, который для такого сценария не очень удобен.

  • Если n8n упадёт после чтения, но до публикации, offset уедет и статья потеряется навсегда.

  • Если в понедельник API отдаст 2 статьи, а во вторник — 15, паузы между постами станут неконтролируемыми.

Моё решение с двумя очередями эти проблемы решает:

✅ NiFi отвечает за темп. Он выпускает статьи строго раз в 3 часа, независимо от того, сколько их накопилось.
✅ n8n просто реагирует на события. У него одна задача: получить сообщение и обработать. Никакого тайминга, никакого хранения состояния.
✅ Offset'ы коммитятся только после успешной публикации всего цикла (новость + разбор + викторина). Если что-то пошло не так, сообщение остаётся в Kafka и будет обработано при следующем запуске.
✅ Если статей мало — очередь внутри NiFi опустеет, и последняя статья уйдёт по расписанию. Если много — они спокойно ждут своей очереди внутри NiFi.

Минусы? Конечно. Это сложнее, чем один скрипт на Python. Это требует понимания и NiFi, и Kafka, и n8n. Это оверхед для канала с нулем подписчиков. Но проект-то учебный — я хотел попробовать инструменты и построить архитектуру, которую потом можно масштабировать. И в этом контексте две очереди — не баг, а фича.

n8n: мозги проекта

Общий вид Workflow в n8n
Общий вид Workflow в n8n

Если NiFi и Kafka — это "логистика", то n8n — это "креатив". Здесь происходит вся магия с LLM и Telegram.

Воркфлоу слушает Kafka-топик (Kafka Trigger). При появлении сообщения запускается цепочка из семи шагов.

Парсинг сообщения из Kafka

Kafka отдаёт сообщение как строку, поэтому первый шаг — превратить строку в объект. Без этого никак.

const messageString = $input.first().json.message;
const parsed = JSON.parse(messageString);
return { ...$input.first().json, ...parsed };

Промпт №1 — добавить хэштеги

Первый промпт — самый простой. Нужно взять уже отформатированный текст новости и добавить в конец 4 хэштега. Никакого творчества, просто "оформи подпись". DeepSeek справляется без проблем.

Скрытый текст
Роль: Ты — редактор новостного Telegram-канала.

Задача: Оформи подпись к фотографии для новостного поста, не добавляй ничего нового, только красивое форматирование.

Исходные данные:
- Оформленная новость:{{ $json.formattedPost }}

Требования:
- 4 хештега в конце (с заглавной буквы)
- Ничего не добавляй в текст новости

Формат:
Оформленная новость
#Хештег1 #Хештег2 #Хештег3 #Хештег4

Скачивание картинки

Почему картинка качается после подготовки основного текста, а не раньше? Потому что тащить бинарные данные через узлы DeepSeek — то ещё удовольствие. Проще скачать перед самой отправкой в Telegram. HTTP Request к image_link — и готово.

Промпт №2 — разбор лексики и грамматики

Вот тут пришлось повозится. DeepSeek может выдать пост на 5000 знаков, а Telegram режет всё, что длиннее 4096. Пришлось жёстко ограничить промпт: не длиннее 3800 символов.

Остальные требования родились из экспериментов:

  • Живые примеры, а не сухие определения.

  • Произношение сложных слов в квадратных скобках.

  • Группировка по темам (глаголы, существительные, фразы).

  • Чистый HTML, потому что Telegram его понимает, а markdown — нет.

Скрытый текст
Роль: Ты — креативный и увлечённый преподаватель английского языка. Твои студенты обожают тебя за то, что ты делаешь сложное — простым, а скучное — интересным.

Задача: Создай подробный разбор лексики и грамматики на основе этой новости. Это будет отдельный пост, который идёт сразу после новости с фото.

Исходные данные:
- Заголовок новости: {{ $('Code in JavaScript').item.json.title }}
- Краткое содержание: {{ $('Code in JavaScript').item.json.summary }}

⚠️ ТЕХНИЧЕСКОЕ ОГРАНИЧЕНИЕ: Весь пост должен быть не длиннее 3800 символов.

ТВОЙ СТИЛЬ:
- Объяснения живые, с примерами, с любовью к языку
- Выбирай 6-7 самых интересных слов и 2 грамматические конструкции (можно больше)
- Для каждого слова: яркое объяснение + 1-2 живых примера
- Добавляй произношение для сложных слов в квадратных скобках
- Используй аналогии и жизненные ситуации в примерах
- Группируй слова по темам, если это уместно
- Добавляй эмодзи для оживления, но не перебарщивай (📚 ✨ 💡)

СТРУКТУРА:

1. ЗАГОЛОВОК (привлекающий внимание):
   Например: <b>📚 Разбираем лексику из новости о NASA</b> или <b>✨ Слова, которые сделают твой английский космическим</b>

2. ВВЕДЕНИЕ (1-2 предложения):
   Например: "Привет, друзья! 👋 Давайте заглянем «под капот» этой короткой новости и найдём настоящие языковые сокровища."

3. ОСНОВНАЯ ЧАСТЬ — ЛЕКСИКА:
   Для каждого слова:
   - <b>Слово</b> [произношение] — объяснение (на русском)
   - <i>Пример 1</i> (курсивом)
   - <i>Пример 2</i> (если есть)
   
   Можно группировать с помощью жирных заголовков:
   <b>🚀 Ключевые глаголы</b>
   <b>🌕 Полезные существительные</b>
   <b>💫 Интересные фразы</b>

4. ГРАММАТИЧЕСКИЙ РАЗДЕЛ:
   <b>✨ Грамматика: название конструкции</b>
   Простое объяснение (на русском)
   <i>Пример из новости или жизни</i>

5. ЗАКЛЮЧЕНИЕ (по желанию):
   Короткое напутствие, вопрос к студентам

ТРЕБОВАНИЯ К ФОРМАТИРОВАНИЮ (Telegram HTML):
- Для <b>жирного текста</b> используй тег <b>
- Для <i>курсива</i> используй тег <i>
- Разделяй смысловые блоки пустыми строками
- Объяснения — на русском, примеры — на английском
- НЕ ИСПОЛЬЗУЙ символы ### для заголовков — они не работают в Telegram
- НЕ ИСПОЛЬЗУЙ двойные звёздочки ** — они не работают в HTML-режиме
- Все теги должны быть правильно закрыты (<b>текст</b>, а не <b>текст)

ВАЖНО: Весь пост должен быть в HTML-формате, готовом для отправки в Telegram с parse_mode=HTML.

Промпт №3 — викторина

Второй по сложности промпт. Здесь главная проблема — заставить модель отдавать правильный ответ строго под индексом 0.

Почему? Потому что потом я сам перемешиваю варианты на стороне n8n. Если доверить рандомизацию модели, результаты могут быть любыми: иногда правильный ответ будет на первом месте, иногда на последнем, а иногда модель вообще вернёт строку, которую сложно распарсить.

Формат ответа — строгий JSON:

{
  "quiz": {
    "question": "Вопрос на английском",
    "options": ["правильный", "неправильный1", "неправильный2", "неправильный3"],
    "explanation": "Объяснение на русском до 200 символов"
  }
}

Магия JavaScript: парсинг и перемешивание

DeepSeek старается, но иногда оборачивает JSON в markdown-блоки: json ... . Приходится чистить.

Очистка:

const rawText = $input.first().json.text;
const cleaned = rawText
  .replace(/```json\s*/g, '')
  .replace(/```\s*$/g, '')
  .replace(/`/g, '')
  .trim();
const quizData = JSON.parse(cleaned);
const quiz = quizData.quiz || quizData;

А дальше — самое интересное. Забираем правильный ответ из индекса 0, перемешиваем рандомно все варианты и возвращаем новый индекс.

const quiz = $input.first().json.quiz;
const correctAnswer = quiz.options[0];
const shuffled = [...quiz.options];
for (let i = shuffled.length - 1; i > 0; i--) {
  const j = Math.floor(Math.random() * (i + 1));
  [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]];
}
const correctIndex = shuffled.indexOf(correctAnswer);
return {
  question: quiz.question,
  options: shuffled,
  correct_option_id: correctIndex,
  explanation: quiz.explanation
};

Стоит отметить, что в n8n нет готовой ноды для Telegram Quiz, поэтому отправляем прямой HTTP-запрос к Bot API.

Что получилось в итоге

Каждые 3 часа в канале появляются три поста:

  1. Новость с фото и хэштегами — просто красивая подача.

  2. Разбор лексики и грамматики — живой текст с примерами, эмодзи и произношением.

  3. Викторина — закрепляет материал, варианты всегда перемешаны.

Пост с основной новостью
Пост с основной новостью
Пост с объяснением лексики и грамматики
Пост с объяснением лексики и грамматики
Пост с викториной
Пост с викториной

Грабли, на которые я наступил (чтобы вы не наступали)

  1. LLM оборачивает JSON в markdown. DeepSeek считает своим долгом добавить json ... даже когда его просят не надо. Пришлось чистить регуляркой.

  2. Telegram не принимает длинные сообщения. Если вариант длиннее N символов — ошибка. Добавил обрезку.

  3. n8n и бинарные данные — плохая история. Пробрасывать картинки через ноды LLM — мучение. Проще скачивать перед отправкой.

Стоило ли оно того?

Если смотреть прагматично — можно было сделать проще. n8n + Python-скрипт на cron решили бы задачу за выходные. Но тогда я бы не пощупал Kafka в деле и не получил бы архитектуру, которую действительно интересно рассказывать. Впрочем и текущее решение тоже сделано за 3 дня: первый день настройка образов в Docker Compose, второй день - настройка пайплайнов, третий день - отладка и запуск.

Этот проект для меня был про "попробовать новое". И новое — это не только DeepSeek с его промптами, но и Kafka с её offset'ами, и n8n с его ограничениями.

Вопросы, критика, идеи — добро пожаловать в комментарии.