golang

Golang-генератор TSV данных для импорта в ClickHouse

  • вторник, 3 октября 2023 г. в 00:00:18
https://habr.com/ru/articles/764678/

Занимаясь написанием статьи о Data Vault по разведению кроликов, возникла потребность сгенерировать много данных для ClickHouse. Все генераторы, что смотрел - так и не придумал как сделать 50ГБ данных быстро и эффективно с их помощью. Поэтому решил развлечься и субботний день провести со старым другом. Сразу скажу - я не занимаюсь разработкой на Golang. Это скорее хобби. Так что прошу не судить строго.

Итак, задача:

  • сгенерировать данные для Data Vault от точки N до текущего дня

  • сделать это в формате TSV, так как ClickHouse просто молниеносно грузит данный формат

  • генерация должна быть быстрой

  • данных должно быть много

  • должны быть поддержаны связи между таблицами

Также приложу схему Vault, для которого все делается. Описание можно почитать по ссылке в начале статьи.

В общем звучит все отвратительно. Если зависимости - значит память. А как без них в БД.

После долгих раздумий за чашкой кофе, я сформулировал для себя концепт будущей утилиты:

  • данные должны поточно писаться на диск в момент формирования, и пропадать из памяти

  • в памяти должен храниться только какой-то небольшой интервал времени

Далее было принято решение реализовывать подобие пайпов с использованием потоков Golang, которые должны были генерировать первичные ключи и временной ряд.

Первым делом набросал структуру, которая будет занята генерацией:

type Context struct {
    pk string
    timestamp_ time.Time
    hash string
    source_ string
    RandomHash []string
}

type TableGenerator struct {
    files map[string]*os.File
    numPipe <-chan int
    datePipe <-chan time.Time
    context Context
}

Основная структура TableGenerator собственно будет хранить ссылки на файлы, в которые идет запись, пайпы, а также контекст.

Первым дело создадим два метода, которые позволят инициализировать структуру, а также добавлять файлы для записи.

// Инициалиазция
func (t *TableGenerator) Init() {
    t.files = make(map[string]*os.File)
}

// Открыть файл для очередной таблицы
func (t TableGenerator) AddFile(tableName string) *os.File {
    f, err := os.Create(fmt.Sprintf("tables/%s.tsv", tableName))    
    if err != nil {
        panic(fmt.Sprintf("TableGenerator.AddFile: %s", err.Error()))
    }
    t.files[tableName] = f
    return t.files[tableName]
}

Первичная инициализация структуры и открытие файлов в главном методе приложения выглядит так:

var generator TableGenerator
generator.Init()
for _, tbl := range tableList {
    f := generator.AddFile(tbl)
    defer  f.Close()
}

После того как файлы открыты, нам потребуется рекурсивный метод по наполнению таблиц. На верхнем уровне мы будем бежать по таблицам родителям - h_cities - и спускаться в каждом новом уровне ниже до h_animals. Таким образом на каждую строку верхнего уровня, будет получен набор дочерних. И мы сможем последовательно писать данные спускаясь вниз и возвращаясь обратно к родителю. Таким образом на нижнем уровне рекурсии достаточно знать контекст только одного родителя, что нам и требуется - меньшее потребление памяти.

  • горда (1 уровень)

  • фермы для городов (2 уровень)

  • кролики для ферм (3 уровень)

Сигнатура метода проста:

func WriteTable (table Table, generator TableGenerator, pContext Context) {
  ...
}

table - это некие метаданные таблицы. Их я принял решение генерировать из JSON на базе такой структуры:

type Field struct {
    Name string `json:"name"` // имя поля
    Type string `json:"type"` // тип поля
    Values []string `json:"values"` // списко допустимых значений для Типа List
    Const string `json:"const"` // константа
    Delta int `json:"delta"` // дельта в прошлое в сутках. дата начала генерации
    ConcatWithParent int `json:"concatWithParent"` // Сотавной ключ родитель-текущий
    LinkField string `json:"linkField"`
}

type Table struct {
    Name string `json:"name"` // имя таблицы
    LinkName string `json:"linkName"`
    Rownum int `json:"rownum"` // фиксированное количество строк
    MinRownum int `json:"minRownum"` // количество строк - минимум для рандомного числа
    MaxRownum int `json:"maxRownum"` // мксимум строк - минимум для рандомного числа
    Fields []Field `json:"fields"` // поля таблицы
    Childs []Table `json:"childs"` // дочерние таблицы (рекурсия)
}

type DataBase struct {
    Tables []Table `json:"tables"`
}
Кому интересен пример моего JSON.
{
	"tables": [
		{
			"name": "h_cities",
			"rownum": 1000,
			"fields": [
				{"name": "city_code", "type": "pk"},
				{"name": "city_hsh", "type": "hash"}, 
				{"name": "timestamp_", "type": "timestamp", "delta": 3000}, 
				{"name": "source_", "type": "string", "const":"R"}
			],
			"childs": [
				{
					"name": "h_farms",
					"linkName": "l_city_farms",
					"minRownum": 1,
					"maxRownum": 1000,
					"fields": [
						{"name": "farm_num", "type": "pk", "concatWithParent": 1},
						{"name": "farm_hsh", "type": "hash"}, 
						{"name": "timestamp_", "type": "timestamp"}, 
						{"name": "source_", "type": "string", "const":"R"}
					],
					"childs": [
						{
							"name": "h_animals",
							"linkName": "l_animal_farms",
							"minRownum": 100,
							"maxRownum": 10000,
							"fields": [
								{"name": "animal_num", "type": "pk", "concatWithParent": 1},
								{"name": "animal_hsh", "type": "hash"}, 
								{"name": "timestamp_", "type": "timestamp", "start": "parent"}, 
								{"name": "source_", "type": "string", "const":"R"}
							],
							"childs": [
								{
									"name": "s_animal_attrs",
									"type": "satellite",
									"rownum": 1,
									"fields": [
										{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"},
										{"name": "sex", "type": "list", "values": ["М", "Ж"]},
										{"name": "color", "type": "list", "values": ["Черный", "Белый", "Красный"]},
										{"name": "birthdate", "type": "copy", "linkField": "timestamp_"}
									]
								},
								{
									"name": "s_animal_lifecycle",
									"type": "satellite",
									"rownum": 1,
									"fields": [
										{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"},
										{"name": "status", "type": "list", "values": ["Жив", "Мертв", "Продан", "Продан живым"]}
									]
								},
								{
									"name": "l_animal_tree",
									"type": "recursion",
									"rownum": 1,
									"fields": [
										{"name": "animal_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "animal_mother_hsh", "type": "parentRandom"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"}
									]
								}
							]
						},
						{
							"name": "l_farm_referals",
							"rownum": 1,
							"fields": [
								{"name": "ref_farm_hsh", "type": "copy", "linkField": "parent.hash"},
								{"name": "attract_farm_hsh", "type": "parentRandom"},
								{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
								{"name": "source_", "type": "copy", "linkField": "parent.source_"}
							]
						}
					]
				},
				{
					"name": "s_city_attrs",
					"type": "satellite",
					"rownum": 1,
					"fields": [
						{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
						{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
						{"name": "source_", "type": "copy", "linkField": "parent.source_"},
						{"name": "name_", "type": "string"}
					]
				}
			]
		}
	]
}

Итак файлы открыты, начинаем обход JSON. Первым идет заполнение таблицы верхнего уровня - h_cities - она самая маленькая.

Каждая таблица, перед тем как писаться, должна инициализировать пайпы в соответствии с типами полей. Поэтому обходим структуру таблиц:

// откроем пайпы для полей разного типа
for _, fld := range table.Fields {
    switch (fld.Type) {
        case "pk":
            generator.numPipe = GenKey(1, rownum)
        case "timestamp":
            sdate := pContext.timestamp_
            // если есть дельта - то нужно отнять от текущего ее и использовать эту дату для старта timeline
            if (fld.Delta > 0) {
                sdate = time.Now().AddDate(0, 0, -fld.Delta)
            }
            generator.datePipe = GenTimeline(1, rownum, sdate)
    }
}

Т.е если для генерируемого поля - указан тип pk (первичный ключ), то стартуем пайпу с генерацией числа от 1 до количества строк в таблице. Если тип timestamp - то стартуем временной ряд.

Какие правила выбраны для генерации даты:

  • для таблиц верхнего уровня (h_cities) указывается delta -N дней от текущей даты. От нее начинают генерироваться даты создания городов

  • для таблиц уровнем ниже - h_farms берется интервал от даты генерации города до текущего дня. Т.е пока город существует и до сего дня - могут открываться фермы

  • ну и так далее вниз по иерархии. Переменная pContext.timestamp_ как раз и хранит дату родительской таблицы.

Ну и собственно сама реализация пайпов:

// Генерируем последовательность чисел от snum до enum
func GenKey(snum, enum int) <-chan int {
    numPipe := make(chan int)
    // функция - поток, которая будет играть роль пайпы для генерации числового ряда
    go func() { 
        for i := snum; i <= enum; i++ {
            numPipe <- i
        }
        close(numPipe)
    }()
    return numPipe
}

// Данный пайплайн нужен, чтобы реализовать механизм генерации по временному ряду
func GenTimeline(snum, enum int, sdate time.Time) <-chan time.Time {
    timePipe := make(chan time.Time)
    // функция - поток, которая будет играть роль пайпы для генерации временного ряда
    go func() {
        // первым этапом сделаем равномерный ряд (неравномеризацию будем накручивать следом)
        // просто рассчитаем шаг каждой строки и будем его прибавлять к стартовой дате
        step := (time.Now().Sub(sdate).Hours() / 24)/(float64)(enum - snum + 1)
        for i := snum; i <= enum; i++ { 
            timePipe <- sdate.AddDate(0, 0, (int)(step * (float64)(i)))
        }
        close(timePipe)
    }()
    return timePipe
}

Первый метод прост - он выкидывает по требованию основного потока число от 1 до N. Это сделано для того, чтобы разгрузить основной код от счетчиков и прочего ненужного хлама.

Второй поинтереснее. Он генерирует дату от T до текущей. Делает это равномерно. Таким образом, работая в паре, оба эти метода будут возвращать в основной код данные для последовательной записи, но при этом основной алгоритм остается чист от хранения какой либо ненужной промежуточной информации о текущем этапе генерации данных для конкретной таблицы.

После того как пайпы открыты - приступаем к формированию строки

for _, fld := range table.Fields {
  ...
  switch (fld.Type) {
      case "pk":
          generator.context.pk = strconv.Itoa(<-generator.numPipe)
          str = generator.context.pk
      case "timestamp":
          generator.context.timestamp_ = <-generator.datePipe
          str = generator.context.timestamp_.Format("2006-01-02")
      case "string":
          str = GenString()
      case "list":
          str = GenStringByList(fld.Values)
      case "copy":
          str = fieldsMap[fld.LinkField]
      case "parentRandom":
          str = pContext.RandomHash[rand.Intn(len(pContext.RandomHash))]
      default:
          str = "UNKNOWN_COLUMN_TYPE"
  }
  ...
  fields = append(fields, str)
}

Все сгенерированные поля сохраняются в срез fields.

Часть генераторов - это просто функции, а не потоки. Сделано это потому, что они не должны хранить контекста на протяжении всего заполнения таблицы - поэтому и не нужно усложнять. А вот первичный ключ и временной ряд содержат контекст и его удобнее хранить в отдельном параллельном методе.

Особое внимание стоит обратить на тип поля parentRandom. Это механизм обеспечения связки со случайным родителем. Поле pContext.RandomHash хранит случайные N хэшей родителя, и когда надо мы просто читаем один случайный. Таким образом рекурсия + небольшое окно хэшей помогает нам рандомизировать связи в БД. Что дает интересный эффект. Так как данные пишутся упорядочено, то данные Хэши родителей всегда созданы ранее, чем дети. Что делает данные еще и логичными.

После того как срез полей заполнен, раскрываем его в метод записи - это просто запись данных на ФС в формате TSV, ничего интересного внутри нет:

// записываем основную таблицу
TSVWriteLine(generator.files[table.Name], fields...)

Последним самым интересным моментом является отправка всего этого в рекурсию, чтобы создавать сложны структуры одним универсальным методом:

// уходим в рекурсию по дочерним таблицам
for _, tbl := range table.Childs {
    WriteTable(tbl, generator, generator.context)
}

Укрупненно весь метод генерации получился таким:

func WriteTable (table Table, generator TableGenerator, pContext Context) {
  ...
  generator.numPipe, generator.datePipe = table.OpenPipes()
  ...
  for i := 1; i <= rownum; i++ {
    fields := table.CreateFields(&generator, pContext)
    ...
    TSVWriteLine(generator.files[table.Name], fields...)
    ...
    for _, tbl := range table.Childs {
        WriteTable(tbl, generator, generator.context)
    }
  }
}

Ну и для примера мой итоговый метод генерации значений полей.

// Сформировать поля строки
func (table Table) CreateFields (generator *TableGenerator, pContext Context) []string {
    var fields []string
    fieldsMap := make(map[string]string)

    fieldsMap["parent.hash"] = pContext.hash
    fieldsMap["parent.timestamp_"] = pContext.timestamp_.Format("2006-01-02")
    fieldsMap["parent.source_"] = pContext.source_

    // заполняем остальные поля
    for _, fld := range table.Fields {
        var str string
        // смотрим тип полей
        switch (fld.Type) {
            // в текущей версии источник - это константа
            case "const":
                str = fld.Const
                generator.context.source_ = str
            // первичный ключ
            case "pk":
                generator.context.pk = strconv.Itoa(<-generator.numPipe)
                str = generator.context.pk
            // временной ряд от даты родителя до настоящего времени
            case "timestamp":
                generator.context.timestamp_ = <-generator.datePipe
                str = generator.context.timestamp_.Format("2006-01-02")
            // Хэш от PK
            case "hash":
                str = GetSHA256(generator.context.pk)
                generator.context.hash = str
                if len(generator.context.RandomHash) < 10 {
                    generator.context.RandomHash = append(generator.context.RandomHash, generator.context.hash)
                } else {
                    generator.context.RandomHash[rand.Intn(len(generator.context.RandomHash))] = generator.context.hash
                }
            case "string":
                str = GenString()
            case "list":
                str = GenStringByList(fld.Values)
            // Копия строки. Поле с которой делается копия должно идти после этого поля
            case "copy":
                str = fieldsMap[fld.LinkField]
            case "parentRandom":
                str = pContext.RandomHash[rand.Intn(len(pContext.RandomHash))]
            default:
                str = "UNKNOWN_COLUMN_TYPE"
        }
        // если нужно PK склеить с родительским
        if fld.ConcatWithParent == 1 {
            str = fmt.Sprintf("%s-%s", pContext.pk, str)
            generator.context.pk = str
        }
        fields = append(fields, str)
        fieldsMap[fld.Name] = str
    }
    return fields
}
}

Как итог. 200 строк кода способные генерировать 1ГБ связанных данных в минуту. Примерно с такой же скоростью эти данные грузятся в ClickHouse, мне показалось даже быстрее.

Потребление ресурсов:

В основном запись на диск. Пока формировались 50ГБ данных ничего не менялось.

Сами данные выглядят как то так:

h_cities
h_cities

Возьмем 69 город и посмотрим его фермы:

h_farms
h_farms

Как видите, несмотря на то, что в коде вообще нет огромных таблиц для создания связей, связи создались успешно + время создания ферм стартует от даты создания города.

Как итог - мы добились последовательной генерации больших объемов связанных данных в 200 строк кода. За счет:

  • пайпов, которые вынесли контекст и не пришлось создавать кучу переменных для каждой таблицы

  • последовательной генерации по временному ряду

  • рекурсии - каждая дочерняя таблица создается тем же кодом что и родительская

  • помещения конфига в адский JSON

Ну и в основной статье я обещал поведать откуда взялись такие странные названия городов - страстный гусь и страстный тапок. Тут все просто. Чтобы все сделать быстро и хорошо провести время, я использовал простой метод:

// Генерируем случайную строку
func GenString() string {
    // TODO утащить это в настроечный файл
    rndTable := [][]string {{"Милый", "Красный", "Малый", "Большой", "Страстный", "Кривой", "Высокий", "Томный", "Хромой","Отличный",
                             "Ужасный", "Великий", "Нижний", "Верхний", "Суровый", "Крошечный", "Мытарский",
                             "Упоротый", "Пьяный", "Шебутной", "Воскресший", "Наивный", "Хвостатый", "Няшный"},
                            {"Рог", "Нос", "Хряк", "Жук", "Зуб", "Рот", "Утес", "Яр", "Мост","Журавль", "Слон", "Конь", "Тапок", "Танк",
                             "Люк", "Мух", "Хряк", "Гусь", "Жбан", "Клоп", "Сон", "Портвейн"}}

    // TODO заменить хардкод на динамическое перемножение N-мерного массива строк
    // для генерации - Милый Наивный Воскресший Гусь и т.п :)
    n0 := rand.Intn(len(rndTable[0]))
    n1 := rand.Intn(len(rndTable[1]))

    return fmt.Sprintf("%s %s", rndTable[0][n0], rndTable[1][n1])
}

Так что, когда я говорю своим друзьям неайтишникам, что писать код еще и весело - они не верят. А зря. Милый Жбан и Ужасный Клоп с ними поспорили бы :)