https://habr.com/ru/company/dentsuaegisnetworkrussia/blog/516886/- Блог компании Dentsu Aegis Network
- Python
- Big Data
- Визуализация данных
Мы хотим рассказать вам о том, как мы помогли нашей BI-команде организовать автоматический процесс доставления данных на Tableau-сервер из MongoDB, используя таблошный формат хранения данных ”hyper”, а сам процесс настройки формирования данных осуществляется через простой веб-интерфейс.
В начале коротко расскажем, как выглядел процесс до и после того, как мы научили наш внутренний продукт А1 программно собирать датасорсы и публиковать их на Tableau Server. Затем подробнее разберем проблему BI-команды и найденное решение, а также заглянем под капот (здесь о создании .hyper файла, публикации файла на tableau-сервере и обновлении хайпера). Добро пожаловать под кат!
Мы в рекламной группе DAN много работаем с данными мониторинга рекламы от
Mediascope, индустриального измерителя на медиарынке. Сценарии бывают разные: некоторые сотрудники выгружают сырые данные, другие пользуются готовыми предобработанными базами, а кто-то заказывает разработку автоматизированных дашбордов на этих данных. О последнем сценарии расскажем подробнее – дашборды наши BI-разработчики собирают в Tableau, но прежде чем начать «рисовать», им тоже необходимо привести данные к нужному удобному для разработки формату.
Жизненный путь данных от сырья до красивых автоматизированных графиков можно условно разбить на 4 шага:
- Получение сырых данных
- Чистка и доработка данных
- Создание источников данных для Tableau
- Разработка визуализаций
Было
До того, как мы научились программно генерировать источники данных для Tableau, процесс выглядел так:
1. Получение сырых данных
Пользователи формируют табличные-отчеты через внутренний инструмент А1. О нем мы подробнее расскажем далее.
2. Чистка и доработка данных
Возможность трансформации данных также заложена в инструмент А1, после чего очищенные данные можно выгрузить в xslx/csv и продолжить с ними работу вне инструмента. Тут стоит отметить, что некоторые пользователи ограничиваются 1ым пунктом и после выгрузки отчетов дорабатывают данные своими силами.
3. Создание источников данных для Tableau
Раньше заказчики дашбордов приходили с набором экселей, которые они сгенерировали на предыдущих пунктах. А BI-разработчики сводили эти эксели в единый датасорс (таблошный сленг) своими силами. Не всегда удавалось ограничиться только инструментами Tableau, часто писали скрипты на Python.
4. Разработка визуализаций
Наконец, вершина айсберга – создание дашборда и публикация его на Tableau Server, где его увидит заказчик. На практике визуализация часто занимает меньше времени, чем сбор данных и настройка обновлений.
Боль копилась на третьем шаге, так как росло количество кастомных решений, которые было затратно поддерживать и реализовывать. Также регулярно просачивались ошибки в данных со второго шага – промежуточный эксель между двух систем (А1 и Tableau) прямо-таки подталкивал пользователя: «давай поправь что-нибудь ручками, никто не заметит».
Стало
Основной задачей было исключить эксели между 2 и 3 шагом. В итоге мы научили А1 собирать датасорсы и публиковать их на Tableau Server. Вот что получилось:
Сейчас шаги с 1 по 3 происходят в А1, на выходе BI-команда получает опубликованный на Tableau Server датасорс для разработки визуализаций. Связующим звеном стал Hyper API, о котором дальше и пойдет речь.
Результаты
Уменьшили кол-во узлов при работе в разных инструментах. Сейчас ошибиться где-то в процессе сложнее, а отловить, где произошла ошибка проще, расследование сбоев занимает меньше времени. Система предупреждает пользователей о распространенных ошибках.
Освободили время BI команды. Раньше было мало шаблонных решений и много кастомизаций. Чаще всего под каждый проект дописывали обработку на Python. В редких случаях, где обработка не нужна была, работали сразу в Tableau Desktop (основной инструмент разработки).
Сейчас подготовка датасорса это: накликать нужные поля в интерфейсе А1, отметить, какие из них разворачиваем в строки (если это необходимо) и опционально заранее определить тип полей.
Не нагружаем Tableau Server обновлением громоздких датасорсов – обновление происходит силами А1, а на сервер закачивается уже готовый hyper.
*Бонус – стимулируем пользователей работать внутри А1. Если раньше некоторые пользователи после выгрузки сырых отчетов, дорабатывали их вручную вне инструмента, то теперь, так как весь процесс с 1 по 3 шаг происходит в А1, пользователям проще настраивать процесс чистки там же.
Проблема и решение
Немного о А1
Прежде чем мы начнем рассказывать про наше решение, нужно рассказать о нашем внутреннем продукте А1, к которому прикрутили генерацию датасорсов.
А1 – это внутренний продукт компании, который призван упростить рабочий процесс сотрудникам, у которых основная работа заключается в следующем:
- Забирать данные из программных продуктов компании MediaScope
- Приводить эти данные (чистить) в удобный для аналитиков-предметников вид
- По необходимости подготавливать данные для создания дашбордов (об этом мы сегодня и поговорим)
После того, как пользователь завершает “чистку” данных, они хранятся в системе А1. В нашей терминологии это называется «Контейнером». Контейнер – это обычный документ в MongoDB, который нам и надо передавать на Tableau-сервер.
Проблема BI-команды
Нашей команде BI-разработчиков нужно было как-то получать данные из А1, которые хранились в MongoDB, и на основе полученных данных строить дашборды. В первую очередь мы попробовали забирать данные из MongoDB штатными средствами табло, но проблему это не решало:
- Поскольку данные хранятся в MongoDB, то на вход в табло поступают данные с произвольной структурой, а это значит, что постоянно пришлось бы заниматься поддержкой данной логики.
- Для агрегации данных из MongoDB нужно было тащить определенные записи из коллекции, а не коллекцию целиком – драйвер Tableau делать это не умеет.
- Кроме всего прочего, мало было получить данные: иногда их нужно было «разворачивать» – делать «unpivot» некоторых столбцов в строки. Что тоже не так просто было сделать, от слова совсем.
Что мы придумали
Было принято решение попробовать решить данную задачу своим велосипедом, используя библиотеку
Tableau Hyper API. Данная библиотека позволяет создавать файл в формате .hyper, в который легко складывать данные, а потом использовать как источник данных для создания дашборда на табло сервере.
Как описывают хайпер сами разработчики табло:
Hyper – это высокопроизводительный in-memory механизм обработки данных, который помогает клиентам быстро анализировать большие или комплексные датасеты, эффективно оценивая запросы в базу данных. Основанная на платформе Tableau, Hyper использует собственные методы динамической генерации кода и передовые технологии параллелизма, позволяющие достигать высокой производительности при создании экстрактов и выполнение запросов.
Примерный процесс работы в нашей программе выглядит следующим образом:
- Пользователь выбирает контейнеры и нужные колонки
- Система выдергивает из контейнеров данные
- По полученным данным система определяет типы колонок
- Инициализируется создание хайпера и вставка в него данных
- Загружается хайпер на табло-сервер
- BI-разработчики видят хайпер на сервере и создает на его основе дашборд
Когда в контейнеры зальют новые данные системе будет подан сигнал, что нужно обновить хайпер:
- Система скачает с табло-сервера хайпер
- Заберет из MongoDB свежие данные и обновит хайпер
- После система загружает на сервер новый хайпер с перезаписью существующего
- Пользователю достаточно просто нажать на кнопку «Refresh», чтобы в дашборде отобразилась актуальная информация
Что видит пользователь
Как уже говорилось ранее, А1 является веб-приложением. Для создания сервиса генерации хайпера на фронте мы использовали Vue.js и Vuetify.
Интерфейс приложения разделен на три экрана.
На первом экране пользователь выбирает нужные контейнеры и колонки.
Если включена опция «Unpivot», то в хайпере будут созданы две дополнительные колонки: variable- наименования колонок, которые выбираются столбцом Metrics и values- значения из этих колонок.
Столбец Dimension добавляет в хайпер колонку с одноименной выбранной колонкой. Количество выбранных колонок Dimensions и их названия должны быть одинаковые во всех контейнерах, чтобы не нарушилась целостность таблицы в хайпере, поэтому присутствует столбец «Имя hyper», который позволяет указать имя выбранной колонке, если в контейнерах они называются по-разному.
На этом процесс настройки хайпера заканчивается. Пользователю достаточно перейти на второй экран, нажать «Создать hyper» и наблюдать за ходом событий в логах.
На третьем экране находятся дополнительные настройки:
- Можно включить игнорирование обновлений, если нам не нужно, чтобы система автоматически обновляла хайпер
- Можно указать email, на который отправлять отчеты об обновлениях
- Можно руками указать тип данных для колонки values (используется только при unpivot режиме): float, string или автоматически определять системой (про типы поговорим дальше)
- Также можно указать типы данных для выбранных колонок у контейнеров.
Что под капотом
А1 написан на Python. Для работы с данными мы используем Pandas, а сами данные мы сериализуем из pandas в pickle и храним в MongoDB GridFS.
Когда поступает команда на создание хайпера, система выполняет следующие операции:
- Выгружает из MongoDB все необходимые контейнеры и десиреализует данные в датайфремы pandas
- Производит подготовку данных: оставляет в датафреймах только нужные колонки, дает им новые имена, при необходимости разворачивает таблицы через pandas.melt
- Если пользователь выставил тип данных у колонок, то произвести конвертацию данных либо во float32, либо в string
- После всех подготовительных работ с данными система через hyper api создает файл и через tabcmd отправляет файл на табло-сервер.
Стоит немного поговорить о типах данных у колонок. Одной из особенностей хранения данных в контейнерах А1 является то, что пользователи не заморачиваются над тем, какие типы назначать колонкам, за них это прекрасно делает pandas: система спокойно справляется с ситуациями, когда в колонке присутствуют числа и строковые значения. Однако хайперу это не нравится: если сказать ему, что колонка должна иметь тип int, то система ругнется при попытке вставить что угодно кроме целого числа. Поэтому было принято решение использовать в хайперах только два типа данных: string и float.
Итак, с общим принципом работы мы разобрались, давайте поговорим уже про саму работу с hyper.
Создание .hyper файла
Для работы с Hyper API понадобится установить библиотеку, скачать ее можно с официально сайта
тут. Там же есть неплохие примеры, как работать с этим инструментом. Мы же кратко укажем основные моменты.
Сам файл хайпера из себя представляет эдакую базу данных, чем-то напоминает SQLite.Через api можно обращаться к данным, используя like SQL синтаксис:
f"SELECT {escape_name('Customer ID')} FROM {escape_name('Customer')}"
Поскольку система у нас написана на Python, то и библиотеку мы будем использовать для соответствующего языка. При создании файла мы должны указать имя схемы, имя таблицы и колонки с типами. Имя схемы и таблицы должны называться “Extract”, поскольку именно в эту схему с таблицей залезает Tableau Server, чтобы вытащить данные для книжек.
with HyperProcess(Telemetry.SEND_USAGE_DATA_TO_TABLEAU) as hyper:
with Connection(
hyper.endpoint, self.fullpath_hyper, CreateMode.CREATE_AND_REPLACE
) as connection:
connection.catalog.create_schema("Extract")
main_table = TableName("Extract", "Extract")
example_table = TableDefinition(main_table)
После создания таблицы нам нужно создать колонки и задать типы. Как мы уже говорили ранее, данные у нас имеют только два типа (float или string), поэтому отталкиваясь от того, какой тип стоит у колонок в датафрейме, такой мы и выставляем для колонок:
for column in dataframe.columns:
if dataframe[column].dtype.name in ("category", "object"):
example_table.add_column(TableDefinition.Column(column, SqlType.text()))
elif dataframe[column].dtype.name in ("float32"):
example_table.add_column(
TableDefinition.Column(column, SqlType.double())
)
connection.catalog.create_table(example_table)
После создания таблицы можно и вставить данные:
with Inserter(connection, example_table) as inserter:
for val in dataframe.values:
inserter.add_row(val.tolist())
inserter.execute()
Здесь мы построчно бежим по датафрейму и накапливаем список значениями через
inserter.add_row(). На самом деле в апи хайпера есть функция
add_rows(), которая принимает список списков и вставляет уже значения. Почему не было так сделано? Ради экономии оперативной памяти: для того чтобы предоставить список списков значений из датафрейма, нужно попросить pandas сделать
values.tolist(). А когда у тебя 150 млн строк данных, получается очень дорогая операция для оперативки, при этом на производительности это никак не сказывается (во всяком случае, не было замечено, что из-за итерационного перебора строк как-то просела скорость создания хайпера). Плюс ко всему,
add_rows() работает как синтаксический сахар: на деле он принимает список списков и так же итерационно добавляет данные.
На этом создание нашего хайпера заканчивается. Дальше нам нужно опубликовать его на сервере.
Публикация файла на tableau-сервере
Для обращения к tableau-серверу мы будем использовать утилиту
tabcmd — это консольная утилита, позволяющая подключаться к серверу и выполнять административные функции – создавать пользователей, группы, книжки и прочее.
Запускать команду tabcmd будем через питоновский subprocess.Popen:
popen = subprocess.Popen(
f'/opt/tableau/tabcmd/bin/tabcmd publish "{fullpath_hyper}" -n "{filename}" -o -r "A1_test" '
'-s http://tableau.domain.com -u "username" -p "password" --no-certcheck',
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
return_code = popen.wait()
if return_code:
error = str(popen.communicate()[1])
return f"Ошибка сервера во время публикации файла. {error}"
Мы передаем tabcmd следующую команду и ключи:
- publish: залить файл на сервер
- -n (--name): какое имя файла будет на сервере
- -o (--overwrite): если присутствует файл с таким именем, то перезаписать
- -r “A1_test” (--project): положить файл в папку (он же проект)
- -s (--server): адрес tableau-сервера
- -u -p: логин и пароль для авторизации на сервере
- --no-certcheck: игнорировать проверку SSL-сертификата
Обновление хайпера
Как создавать новый хайпер мы разобрались, но что делать, когда хайпер состоит из десятка контейнеров и в один из них поступили новые данные? Будем обновлять хайпер.
Когда в контейнер поступают новые данные, система смотрит, есть ли хайперы, которые используют этот контейнер. Если есть, то ставится задача на обновление хайпера.
Чтобы понимать, какие данные из какого контейнера лежат в хайпере, система при создании хайпера также создает дополнительную колонку container_id. При таком подходе обновление становится очень простым:
- Забираем файл с сервера
- Удаляем все строки в хайпере, где container_id равняется обновляемому контейнеру
- Вставляем новые строчки
- Загружаем обратно на сервер файл с перезаписью.
Процесс забора файла немного отличается от процесса его загрузки. В первую очередь, с сервера мы будем забирать не .hyper файл, а архив .tdsx, который уже после будем распаковывать и открывать сам .hyper.
Для того, чтобы забрать файл, мы используем tabcmd:
popen = subprocess.Popen(
f'/opt/tableau/tabcmd/bin/tabcmd get "datasources/{filename_tdsx}" '
f'-s http://tableau.domain.com -u "username" -p "password" '
f'--no-certcheck -f "{fullpath_tdsx}"',
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
return_code = popen.wait()
if return_code:
error = str(popen.communicate()[1])
return f"Ошибка. {error}"
Тут используем следующую команду и ключи:
- get: забрать с сервера файл. Если на сервере лежит файл test.hyper, то обращаться надо к файлу test.tdsx, а лежат они все в директории datasource (я не смог нагуглить, почему такая особенность работы в табло, если знаете, поделитесь в комментариях )
- -f (--filename): полный путь, включая имя файла и его расширение, куда надо сохранить файл
После того, как файл будет скачен, его надо разархивировать через zipfile:
with zipfile.ZipFile(fullpath_tdsx, "r") as zip_ref:
zip_ref.extractall(path)
После разархивации хайпер будет лежать в директории
./Data/Extracts.
Теперь, когда у нас есть актуальная версия файла, мы можем удалить из него ненужные строки:
table_name = TableName("Extract", "Extract")
with HyperProcess(Telemetry.SEND_USAGE_DATA_TO_TABLEAU) as hyper:
with Connection(hyper.endpoint, self.fullpath_hyper) as connection:
connection.execute_query(
f"DELETE FROM {table_name} WHERE "
f'{escape_name("container_id")}={container_id}'
).close()
Ну а вставка и публикация файла были уже описаны выше.
Заключение
Что в итоге? Проделав работу по внедрению генерации hyper-файлов и автоматической доставки их на tableau-сервер, мы в разы снизили нагрузку на BI-команду, данные в дашборде обновлять стало проще и, самое главное, быстрее. Само знакомство с hyper api не было болезненным, документация неплохо написана, а сама интеграция технологии в нашу систему прошла легко.
Благодарим за внимание! Если у вас есть вопросы или замечания, пожалуйста, оставляйте их в комментариях.
Статья написана совместно с Василием Лавровым (VasilyFromOpenSpace) — Старшим разработчиком бизнес-аналитики