Собираем статистику по телеграм-каналу и строим кастомные графики
- пятница, 17 ноября 2023 г. в 00:00:25
Привет, Хабр! Меня зовут Вадим, я разработчик и у меня есть свой канал в Телеграме. Как и многие уважающие себя телеграм‑админы, я хочу следить за статистикой: оценивать эффективность, когда что‑то делаю для привлечения подписчиков, мониторить прирост аудитории и знать, какие именно пользователи и когда подписались/отписались.
В Телеграме из коробки можно посмотреть какие‑то графики. Но, например, количество подписчиков отображается с точностью до дня, что не очень удобно. Сделать оттуда drill down до конкретного действия тоже нельзя.
При этом как администратор канала через API я могу получить гораздо больше нужной информации. В этой статье покажу, какие кастомные графики и как я собрал с использованием доступных инструментов: YDB в режиме serverless и DataLens.
По умолчанию нативный график по количеству подписчиков в Телеграм выглядит так:
Анализировать в таком режиме те же отписки не получится — их в таком обобщённом графике просто не видно.
Но в первом приближении через API можно дополнительно получать:
ID пользователя;
время события с точностью до секунды;
само событие со своими метаданными (например ссылка, по которой пришёл новый подписчик).
Ещё в первый день активного привлечения подписчиков в свой канал я загорелся идеей написать простой скрипт на Python: он подключался от имени пользователя с админ‑правами, получал все последние события и сохранял их в обычные текстовые файлы в директорию рядом.
Правда, выгружалось всё в каком‑то очень странном формате через .stringify()
в telethon, который выступал таким «чёрным ящиком».
Этот формат был не очень удачным и привёл к проблемам в дальнейшем. На тот момент я этого ещё не знал и пошёл развивать идею дальше.
Для своей разработки я выбрал Go и использовал библиотеку gotd, как самую популярную и активно поддерживаемую.
Всё должно было работать примерно так: мы раз в минуту запрашиваем новые события и пользователей из них, а дальше просто записываем в базу данных.
func main() {
// init
go AdminLogUpdater(ctx, time.Minute, channelID)
}
func AdminLogUpdater(ctx context.Context,
interval time.Duration, channelID int64) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
err := updateAdminLog(ctx, channelID)
if err != nil {
// logging
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}
func updateAdminLog(ctx context.Context, channelID int64) error {
resp, err := GetChannelAdminLog(ctx, channelID)
if err != nil {
return err
}
storage.AddEvents(resp.Events)
storage.AddUsers(resp.Users)
return nil
}
Здесь вскрылась проблема с telethon, который многое делал внутри себя и не показывал. Так что кода для gotd пришлось сделать больше.
В Telegram есть два типа аккаунтов.
Боты, которым @BotFather сразу выдает ID и токен. Их можно подставить в URL, и получится полный доступ к Bot API.
Обычные юзеры с гораздо более сложным процессом аутентификации. Для него сначала нужен номер телефона и пароль, а потом второй фактор: нужно получить код либо через уже авторизованное устройство, либо через SMS.
При этом полученной сессией не выйдет пользоваться сразу на двух устройствах, её заблокируют. Также у неё есть время жизни, после которого она может сама стать неактуальной и придётся перелогиниться.
Несмотря на сложности, в своей разработке я решил использовать обычный аккаунт, потому что это интереснее. Да и какие‑то нужные функции могли быть недоступны от имени бота.
Чтобы клиенту получить какую‑то информацию об аккаунте или чате, нужно знать AccessHash — он используется как своего рода линкер между двумя пользователями, который никогда не меняется. Его дают, если произошёл коннект: например, в списке последних активностей в канале можно нажать на имя нового подписчика и перейти в его профиль.
К примеру, если аккаунт А успешно подписался на канал X, то нам будет выдан AccessHash = 123. А если аккаунт Б подписался на канал X, то будет сгенерирован AccessHash = 456. И даже если аккаунт Б завладеет AccessHash, это не поможет ему получить неавторизованный доступ к аккаунту А, потому что для каждой пары пользователь‑пользователь хэш будет уникальным.
Механизм необходим для приватности: иначе можно было бы перебрать все ID пользователей, каналы и чаты в цикле, а дальше использовать это, например, для рассылки спама. Поэтому чтобы получить какую‑то информацию о пользователе, нужно получить от него разрешение и узнать хэш.
В telethon эти хэши были спрятаны, хранились в базе SQLite и подставлялись сами. В нашем случае AccessHash будет служить доказательством, что пользователь уже «знаком» с нами и таким образом подтвердил желание взаимодействовать с нашим каналом. Если мы хотим обратиться к пользователю от имени нашего канала через библиотеку, то AccessHash должен быть под рукой.
Для хранения данных я использовал Serverless YDB в Yandex Cloud — и не только потому, что футболку подарили.
Поскольку это pet‑проект, в первую очередь, выбирал доступные по цене решения.
В serverless‑режиме для базы данных не нужен выделенный требующий обслуживания сервер, а тарифицируются только запросы и хранилище данных. С учётом лимитов, где первый миллион запросов в месяц и первый гигабайт данных будут бесплатными, этим могут пользоваться небольшие проекты и совсем без бюджета.
При этом в сервисе есть все необходимые механизмы для безопасного управления доступом и защиты данных — в контексте этой статьи я не буду останавливаться на этом подробно, но рекомендую ознакомиться с документацией по теме.
Для первоначальной задумки нужно хранить только два типа данных: пользователи и события.
Для пользователей будем хранить минимально необходимую информацию: ID, AccessHash, имя (для удобства сопоставления пользовательских действий). При желании можно настроить другой набор данных, главное, — опять же, не забыть позаботиться о правилах приватности и безопасности хранения.
Для событий храним ID, время, ID пользователя, тип события и все метаданные в формате JSON.
Пока что golang‑migrate не умеет работать с YDB (если кто‑то очень хочет научить, можно написать мне, и я перенаправлю на нужного человека). Зато если попробовать создать таблицу, которая уже существует, мы не получим никаких ошибок. Поэтому можно делать CREATE TABLE
операции при каждом запуске.
CREATE TABLE events (
id Int64 NOT NULL,
date Timestamp,
user_id Int64,
action_type Utf8,
action Json,
PRIMARY KEY (
id
)
);
CREATE TABLE users (
id Int64 NOT NULL,
access_hash Int64,
first_name Utf8,
PRIMARY KEY (
id
)
);
Чтобы всё работало быстрее, тут мы можем использовать UPSERT
(это как INSERT ... ON CONFLICT DO UPDATE
). Так, если уже существует запись, например, о пользователе, мы просто её обновим.
UPSERT INTO events (
id, date, user_id, action_type, action
) VALUES (
$id, $date, $user_id, $action_type, $action
);
UPSERT INTO users (
id, access_hash, first_name
) VALUES (
$id, $access_hash, $first_name
);
Делаем интерфейс Storager с функциями AddEvent и AddUser:
type Storager interface {
AddEvent(ctx context.Context, event Event) error
AddUser(ctx context.Context, user User) error
}
После чего запускаем, заходим в консоль и видим наши данные!
От telethon осталась база данных, в которой есть ID и AccessHash пользователей, с которыми он взаимодействовал. Сохраняем их в csv, а дальше получаем остальное из Телеграма и добавляем в базу.
f, err := os.Open(filePath)
if err != nil { ... }
defer func() { _ = f.Close() }()
csvReader := csv.NewReader(f)
people, err := csvReader.ReadAll()
if err != nil { ... }
for _, person := range people {
var userID, userAccessHash int64
var user User
userID, err = strconv.ParseInt(person[0], 10, 64)
if err != nil { ... }
userAccessHash, err = strconv.ParseInt(person[1], 10, 64)
if err != nil { ... }
// sleep to prevent FLOOD_WAIT error
time.Sleep(5 * time.Second)
user, err = telegram.GetUser(ctx, userID, userAccessHash)
if err != nil { ... }
err = ydb.AddUser(ctx, user)
if err != nil { ... }
}
Получилось 284 строки.
Тут уже не надо будет ничего получать из API телеграма, попробуем импортировать через YDB CLI.
Файлы с событиями выглядят так:
ChannelAdminLogEvent(
id=1234567890,
date=datetime.datetime(2023, 8, 15, 15, 30, 45, tzinfo=datetime.timezone.utc),
user_id=1234567890,
action=ChannelAdminLogEventActionParticipantJoinByInvite(
...
)
)
И чтобы с ними работать, достаточно присвоить к какой‑нибудь переменной без дополнительных преобразований. Python интерпретируемый, поэтому делаем так через eval, собираем в JSON и выводим:
dir_name = "actions"
for action in os.listdir(dir_name):
v = "".join(open(f"{dir_name}/{action}").readlines())
exec(f"a = {v}")
action_type = json.loads(a.action.to_json())["_"]
action_type = action_type[0].lower() + action_type[1:]
action = {
"id": a.id,
"date": a.date.isoformat(),
"user_id": a.user_id,
"action_type": action_type,
"action": a.action.to_json(),
}
print(json.dumps(action))
Для импорта нам нужен файл, где каждая строка будет соответствовать будущей строке в таблице (массив объектов не подойдёт). Поэтому сохраняем в файл прямо так:
> python3 parse.py > events.json
Теперь импортируем, используя yc и YDB:
# активируем профиль с нужным облаком
> yc config profile activate my-cloud
# проверяем, что мы видим базу данных
> yc ydb database list --folder-id <folder-id>
+------------+-----------+---------
| ID | NAME | ...
+------------+-----------+---------
| .... | channeler | ...
+------------+-----------+---------
# генерируем iam-токен и сохраняем рядом
> yc iam create-token > my-token
# импортируем через ydb файл events.json в таблицу events
> ydb --iam-token-file my-token \
> -e grpcs://ydb.serverless.yandexcloud.net:2135 \
> -d /ru-central1/... \
> import file json -p myy_events events.json
Elapsed: 0.342391 sec
Получилось 465 строк.
Заходим в DataLens и создаём подключение к нашей базе данных YDB:
Создаём датасет, в котором связываем поля между собой:
Так как события сохранялись не с создания канала, то задаём параметр с количеством подписчиков на начало подсчета.
Добавляем параметр subscribers_delta
, который показывает изменение количества подписчиков после какого-то события:
CASE [action_type]
WHEN "channelAdminLogEventActionParticipantJoin" THEN 1
WHEN "channelAdminLogEventActionParticipantJoinByInvite" THEN 1
WHEN "channelAdminLogEventActionParticipantJoinByRequest" THEN 1
WHEN "channelAdminLogEventActionParticipantLeave" THEN -1
ELSE 0
END
Дальше добавляем параметр subscribers_count
, который показывает количество подписчиков на момент какого-то события:
[initial_subscribers_count] + RSUM(SUM([subscribers_delta]), "asc" ORDER BY [date])
Теперь у нас есть всё, чтобы сделать самый главный чарт — подробный график количества подписчиков!
Например, тут видно дни, когда я активно рекламировал канал, приставая к людям. Или отписки после подведения итогов розыгрыша ровера.
Ещё можно сделать табличку, где видно всех подписчиков или не подписчиков:
Количество событий = COUNT([event_id])
Последнее событие = MAX([date])
Подписан или нет = SUM([subscribers_delta])
Теперь очень круто было бы отдельно считать статистику и строить графики по инвайт‑ссылкам.
Также к каналу привязан чат, в котором тоже есть события и комментарии пользователей, которые можно сохранять и анализировать. Например, скармливать их какому‑нибудь GPT и понимать, какие посты больше интересны подписчикам, а какие только уменьшают их количество.