Golang-генератор TSV данных для импорта в ClickHouse
- вторник, 3 октября 2023 г. в 00:00:18
Занимаясь написанием статьи о Data Vault по разведению кроликов, возникла потребность сгенерировать много данных для ClickHouse. Все генераторы, что смотрел - так и не придумал как сделать 50ГБ данных быстро и эффективно с их помощью. Поэтому решил развлечься и субботний день провести со старым другом. Сразу скажу - я не занимаюсь разработкой на Golang. Это скорее хобби. Так что прошу не судить строго.
Итак, задача:
сгенерировать данные для Data Vault от точки N до текущего дня
сделать это в формате TSV, так как ClickHouse просто молниеносно грузит данный формат
генерация должна быть быстрой
данных должно быть много
должны быть поддержаны связи между таблицами
Также приложу схему Vault, для которого все делается. Описание можно почитать по ссылке в начале статьи.
В общем звучит все отвратительно. Если зависимости - значит память. А как без них в БД.
После долгих раздумий за чашкой кофе, я сформулировал для себя концепт будущей утилиты:
данные должны поточно писаться на диск в момент формирования, и пропадать из памяти
в памяти должен храниться только какой-то небольшой интервал времени
Далее было принято решение реализовывать подобие пайпов с использованием потоков Golang, которые должны были генерировать первичные ключи и временной ряд.
Первым делом набросал структуру, которая будет занята генерацией:
type Context struct {
pk string
timestamp_ time.Time
hash string
source_ string
RandomHash []string
}
type TableGenerator struct {
files map[string]*os.File
numPipe <-chan int
datePipe <-chan time.Time
context Context
}
Основная структура TableGenerator собственно будет хранить ссылки на файлы, в которые идет запись, пайпы, а также контекст.
Первым дело создадим два метода, которые позволят инициализировать структуру, а также добавлять файлы для записи.
// Инициалиазция
func (t *TableGenerator) Init() {
t.files = make(map[string]*os.File)
}
// Открыть файл для очередной таблицы
func (t TableGenerator) AddFile(tableName string) *os.File {
f, err := os.Create(fmt.Sprintf("tables/%s.tsv", tableName))
if err != nil {
panic(fmt.Sprintf("TableGenerator.AddFile: %s", err.Error()))
}
t.files[tableName] = f
return t.files[tableName]
}
Первичная инициализация структуры и открытие файлов в главном методе приложения выглядит так:
var generator TableGenerator
generator.Init()
for _, tbl := range tableList {
f := generator.AddFile(tbl)
defer f.Close()
}
После того как файлы открыты, нам потребуется рекурсивный метод по наполнению таблиц. На верхнем уровне мы будем бежать по таблицам родителям - h_cities - и спускаться в каждом новом уровне ниже до h_animals. Таким образом на каждую строку верхнего уровня, будет получен набор дочерних. И мы сможем последовательно писать данные спускаясь вниз и возвращаясь обратно к родителю. Таким образом на нижнем уровне рекурсии достаточно знать контекст только одного родителя, что нам и требуется - меньшее потребление памяти.
горда (1 уровень)
фермы для городов (2 уровень)
кролики для ферм (3 уровень)
Сигнатура метода проста:
func WriteTable (table Table, generator TableGenerator, pContext Context) {
...
}
table - это некие метаданные таблицы. Их я принял решение генерировать из JSON на базе такой структуры:
type Field struct {
Name string `json:"name"` // имя поля
Type string `json:"type"` // тип поля
Values []string `json:"values"` // списко допустимых значений для Типа List
Const string `json:"const"` // константа
Delta int `json:"delta"` // дельта в прошлое в сутках. дата начала генерации
ConcatWithParent int `json:"concatWithParent"` // Сотавной ключ родитель-текущий
LinkField string `json:"linkField"`
}
type Table struct {
Name string `json:"name"` // имя таблицы
LinkName string `json:"linkName"`
Rownum int `json:"rownum"` // фиксированное количество строк
MinRownum int `json:"minRownum"` // количество строк - минимум для рандомного числа
MaxRownum int `json:"maxRownum"` // мксимум строк - минимум для рандомного числа
Fields []Field `json:"fields"` // поля таблицы
Childs []Table `json:"childs"` // дочерние таблицы (рекурсия)
}
type DataBase struct {
Tables []Table `json:"tables"`
}
{
"tables": [
{
"name": "h_cities",
"rownum": 1000,
"fields": [
{"name": "city_code", "type": "pk"},
{"name": "city_hsh", "type": "hash"},
{"name": "timestamp_", "type": "timestamp", "delta": 3000},
{"name": "source_", "type": "string", "const":"R"}
],
"childs": [
{
"name": "h_farms",
"linkName": "l_city_farms",
"minRownum": 1,
"maxRownum": 1000,
"fields": [
{"name": "farm_num", "type": "pk", "concatWithParent": 1},
{"name": "farm_hsh", "type": "hash"},
{"name": "timestamp_", "type": "timestamp"},
{"name": "source_", "type": "string", "const":"R"}
],
"childs": [
{
"name": "h_animals",
"linkName": "l_animal_farms",
"minRownum": 100,
"maxRownum": 10000,
"fields": [
{"name": "animal_num", "type": "pk", "concatWithParent": 1},
{"name": "animal_hsh", "type": "hash"},
{"name": "timestamp_", "type": "timestamp", "start": "parent"},
{"name": "source_", "type": "string", "const":"R"}
],
"childs": [
{
"name": "s_animal_attrs",
"type": "satellite",
"rownum": 1,
"fields": [
{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
{"name": "source_", "type": "copy", "linkField": "parent.source_"},
{"name": "sex", "type": "list", "values": ["М", "Ж"]},
{"name": "color", "type": "list", "values": ["Черный", "Белый", "Красный"]},
{"name": "birthdate", "type": "copy", "linkField": "timestamp_"}
]
},
{
"name": "s_animal_lifecycle",
"type": "satellite",
"rownum": 1,
"fields": [
{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
{"name": "source_", "type": "copy", "linkField": "parent.source_"},
{"name": "status", "type": "list", "values": ["Жив", "Мертв", "Продан", "Продан живым"]}
]
},
{
"name": "l_animal_tree",
"type": "recursion",
"rownum": 1,
"fields": [
{"name": "animal_hsh", "type": "copy", "linkField": "parent.hash"},
{"name": "animal_mother_hsh", "type": "parentRandom"},
{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
{"name": "source_", "type": "copy", "linkField": "parent.source_"}
]
}
]
},
{
"name": "l_farm_referals",
"rownum": 1,
"fields": [
{"name": "ref_farm_hsh", "type": "copy", "linkField": "parent.hash"},
{"name": "attract_farm_hsh", "type": "parentRandom"},
{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
{"name": "source_", "type": "copy", "linkField": "parent.source_"}
]
}
]
},
{
"name": "s_city_attrs",
"type": "satellite",
"rownum": 1,
"fields": [
{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
{"name": "source_", "type": "copy", "linkField": "parent.source_"},
{"name": "name_", "type": "string"}
]
}
]
}
]
}
Итак файлы открыты, начинаем обход JSON. Первым идет заполнение таблицы верхнего уровня - h_cities - она самая маленькая.
Каждая таблица, перед тем как писаться, должна инициализировать пайпы в соответствии с типами полей. Поэтому обходим структуру таблиц:
// откроем пайпы для полей разного типа
for _, fld := range table.Fields {
switch (fld.Type) {
case "pk":
generator.numPipe = GenKey(1, rownum)
case "timestamp":
sdate := pContext.timestamp_
// если есть дельта - то нужно отнять от текущего ее и использовать эту дату для старта timeline
if (fld.Delta > 0) {
sdate = time.Now().AddDate(0, 0, -fld.Delta)
}
generator.datePipe = GenTimeline(1, rownum, sdate)
}
}
Т.е если для генерируемого поля - указан тип pk (первичный ключ), то стартуем пайпу с генерацией числа от 1 до количества строк в таблице. Если тип timestamp - то стартуем временной ряд.
Какие правила выбраны для генерации даты:
для таблиц верхнего уровня (h_cities) указывается delta -N дней от текущей даты. От нее начинают генерироваться даты создания городов
для таблиц уровнем ниже - h_farms берется интервал от даты генерации города до текущего дня. Т.е пока город существует и до сего дня - могут открываться фермы
ну и так далее вниз по иерархии. Переменная pContext.timestamp_ как раз и хранит дату родительской таблицы.
Ну и собственно сама реализация пайпов:
// Генерируем последовательность чисел от snum до enum
func GenKey(snum, enum int) <-chan int {
numPipe := make(chan int)
// функция - поток, которая будет играть роль пайпы для генерации числового ряда
go func() {
for i := snum; i <= enum; i++ {
numPipe <- i
}
close(numPipe)
}()
return numPipe
}
// Данный пайплайн нужен, чтобы реализовать механизм генерации по временному ряду
func GenTimeline(snum, enum int, sdate time.Time) <-chan time.Time {
timePipe := make(chan time.Time)
// функция - поток, которая будет играть роль пайпы для генерации временного ряда
go func() {
// первым этапом сделаем равномерный ряд (неравномеризацию будем накручивать следом)
// просто рассчитаем шаг каждой строки и будем его прибавлять к стартовой дате
step := (time.Now().Sub(sdate).Hours() / 24)/(float64)(enum - snum + 1)
for i := snum; i <= enum; i++ {
timePipe <- sdate.AddDate(0, 0, (int)(step * (float64)(i)))
}
close(timePipe)
}()
return timePipe
}
Первый метод прост - он выкидывает по требованию основного потока число от 1 до N. Это сделано для того, чтобы разгрузить основной код от счетчиков и прочего ненужного хлама.
Второй поинтереснее. Он генерирует дату от T до текущей. Делает это равномерно. Таким образом, работая в паре, оба эти метода будут возвращать в основной код данные для последовательной записи, но при этом основной алгоритм остается чист от хранения какой либо ненужной промежуточной информации о текущем этапе генерации данных для конкретной таблицы.
После того как пайпы открыты - приступаем к формированию строки
for _, fld := range table.Fields {
...
switch (fld.Type) {
case "pk":
generator.context.pk = strconv.Itoa(<-generator.numPipe)
str = generator.context.pk
case "timestamp":
generator.context.timestamp_ = <-generator.datePipe
str = generator.context.timestamp_.Format("2006-01-02")
case "string":
str = GenString()
case "list":
str = GenStringByList(fld.Values)
case "copy":
str = fieldsMap[fld.LinkField]
case "parentRandom":
str = pContext.RandomHash[rand.Intn(len(pContext.RandomHash))]
default:
str = "UNKNOWN_COLUMN_TYPE"
}
...
fields = append(fields, str)
}
Все сгенерированные поля сохраняются в срез fields.
Часть генераторов - это просто функции, а не потоки. Сделано это потому, что они не должны хранить контекста на протяжении всего заполнения таблицы - поэтому и не нужно усложнять. А вот первичный ключ и временной ряд содержат контекст и его удобнее хранить в отдельном параллельном методе.
Особое внимание стоит обратить на тип поля parentRandom. Это механизм обеспечения связки со случайным родителем. Поле pContext.RandomHash хранит случайные N хэшей родителя, и когда надо мы просто читаем один случайный. Таким образом рекурсия + небольшое окно хэшей помогает нам рандомизировать связи в БД. Что дает интересный эффект. Так как данные пишутся упорядочено, то данные Хэши родителей всегда созданы ранее, чем дети. Что делает данные еще и логичными.
После того как срез полей заполнен, раскрываем его в метод записи - это просто запись данных на ФС в формате TSV, ничего интересного внутри нет:
// записываем основную таблицу
TSVWriteLine(generator.files[table.Name], fields...)
Последним самым интересным моментом является отправка всего этого в рекурсию, чтобы создавать сложны структуры одним универсальным методом:
// уходим в рекурсию по дочерним таблицам
for _, tbl := range table.Childs {
WriteTable(tbl, generator, generator.context)
}
Укрупненно весь метод генерации получился таким:
func WriteTable (table Table, generator TableGenerator, pContext Context) {
...
generator.numPipe, generator.datePipe = table.OpenPipes()
...
for i := 1; i <= rownum; i++ {
fields := table.CreateFields(&generator, pContext)
...
TSVWriteLine(generator.files[table.Name], fields...)
...
for _, tbl := range table.Childs {
WriteTable(tbl, generator, generator.context)
}
}
}
Ну и для примера мой итоговый метод генерации значений полей.
// Сформировать поля строки
func (table Table) CreateFields (generator *TableGenerator, pContext Context) []string {
var fields []string
fieldsMap := make(map[string]string)
fieldsMap["parent.hash"] = pContext.hash
fieldsMap["parent.timestamp_"] = pContext.timestamp_.Format("2006-01-02")
fieldsMap["parent.source_"] = pContext.source_
// заполняем остальные поля
for _, fld := range table.Fields {
var str string
// смотрим тип полей
switch (fld.Type) {
// в текущей версии источник - это константа
case "const":
str = fld.Const
generator.context.source_ = str
// первичный ключ
case "pk":
generator.context.pk = strconv.Itoa(<-generator.numPipe)
str = generator.context.pk
// временной ряд от даты родителя до настоящего времени
case "timestamp":
generator.context.timestamp_ = <-generator.datePipe
str = generator.context.timestamp_.Format("2006-01-02")
// Хэш от PK
case "hash":
str = GetSHA256(generator.context.pk)
generator.context.hash = str
if len(generator.context.RandomHash) < 10 {
generator.context.RandomHash = append(generator.context.RandomHash, generator.context.hash)
} else {
generator.context.RandomHash[rand.Intn(len(generator.context.RandomHash))] = generator.context.hash
}
case "string":
str = GenString()
case "list":
str = GenStringByList(fld.Values)
// Копия строки. Поле с которой делается копия должно идти после этого поля
case "copy":
str = fieldsMap[fld.LinkField]
case "parentRandom":
str = pContext.RandomHash[rand.Intn(len(pContext.RandomHash))]
default:
str = "UNKNOWN_COLUMN_TYPE"
}
// если нужно PK склеить с родительским
if fld.ConcatWithParent == 1 {
str = fmt.Sprintf("%s-%s", pContext.pk, str)
generator.context.pk = str
}
fields = append(fields, str)
fieldsMap[fld.Name] = str
}
return fields
}
}
Как итог. 200 строк кода способные генерировать 1ГБ связанных данных в минуту. Примерно с такой же скоростью эти данные грузятся в ClickHouse, мне показалось даже быстрее.
Потребление ресурсов:
В основном запись на диск. Пока формировались 50ГБ данных ничего не менялось.
Сами данные выглядят как то так:
Возьмем 69 город и посмотрим его фермы:
Как видите, несмотря на то, что в коде вообще нет огромных таблиц для создания связей, связи создались успешно + время создания ферм стартует от даты создания города.
Как итог - мы добились последовательной генерации больших объемов связанных данных в 200 строк кода. За счет:
пайпов, которые вынесли контекст и не пришлось создавать кучу переменных для каждой таблицы
последовательной генерации по временному ряду
рекурсии - каждая дочерняя таблица создается тем же кодом что и родительская
помещения конфига в адский JSON
Ну и в основной статье я обещал поведать откуда взялись такие странные названия городов - страстный гусь и страстный тапок. Тут все просто. Чтобы все сделать быстро и хорошо провести время, я использовал простой метод:
// Генерируем случайную строку
func GenString() string {
// TODO утащить это в настроечный файл
rndTable := [][]string {{"Милый", "Красный", "Малый", "Большой", "Страстный", "Кривой", "Высокий", "Томный", "Хромой","Отличный",
"Ужасный", "Великий", "Нижний", "Верхний", "Суровый", "Крошечный", "Мытарский",
"Упоротый", "Пьяный", "Шебутной", "Воскресший", "Наивный", "Хвостатый", "Няшный"},
{"Рог", "Нос", "Хряк", "Жук", "Зуб", "Рот", "Утес", "Яр", "Мост","Журавль", "Слон", "Конь", "Тапок", "Танк",
"Люк", "Мух", "Хряк", "Гусь", "Жбан", "Клоп", "Сон", "Портвейн"}}
// TODO заменить хардкод на динамическое перемножение N-мерного массива строк
// для генерации - Милый Наивный Воскресший Гусь и т.п :)
n0 := rand.Intn(len(rndTable[0]))
n1 := rand.Intn(len(rndTable[1]))
return fmt.Sprintf("%s %s", rndTable[0][n0], rndTable[1][n1])
}
Так что, когда я говорю своим друзьям неайтишникам, что писать код еще и весело - они не верят. А зря. Милый Жбан и Ужасный Клоп с ними поспорили бы :)