golang

Челлендж по обработке миллиарда строк на Go: от 1 минуты 45 секунд до 4 секунд

  • пятница, 8 марта 2024 г. в 00:00:16
https://habr.com/ru/articles/798215/

Пару недель назад я прочитал о запавшем мне в душу челлендже по обработке миллиарда строк, поэтому захотел решить его на Go.

Я немного опоздал, соревнования проводились в январе. И на Java. Меня не особо интересует Java, зато давно интересует оптимизация кода на Go.

Этот челлендж был очень прост: обработать текстовый файл названий метеорологических станций и температур, и для каждой станции вывести минимальное, среднее и максимальное значение. Чтобы упростить задачу, было ещё несколько ограничений, однако я проигнорировал те, что относятся только к Java.

Вот несколько строк с примером входных данных:

Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
...

Единственная тонкость заключается в том, что входной файл состоит из миллиарда строк. Это примерно 13 ГБ данных. Я уже разобрался, что дисковый ввод-вывод не становится узким местом, обычно замедляют подобные программы распределения памяти и парсинг.

В этой статье описывается девять написанных мной на Go решений, каждое из которых быстрее предыдущего. Первое, простое и идиоматичное, выполняется на моей машине 1 минуту 45 секунд, а последнее — примерно 4 секунды. По ходу дела я буду показывать, как использовал профилировщик Go, чтобы понимать, на что тратится время.

Вот список решений, от самого медленного до самого быстрого:

  • r1: простое и идиоматичное

  • r2: map со значениями указателей

  • r3: ручной парсинг температур

  • r4: целые числа с фиксированной запятой

  • r5: избегаем bytes.Cut

  • r6: избегаем bufio.Scanner

  • r7: специальная хэш-таблица

  • r8: распараллеленное r1

  • r9: распараллеленное r7

Я хотел, чтобы каждое из решений было портируемым кодом на Go, использующим только стандартную библиотеку: никакого ассемблера, никаких unsafe и никаких файлов с отображением в память. Для меня 4 секунды, или 3,2 ГБ/с показались достаточно быстрым результатом. Для сравнения: самое быстрое, сильно оптимизированное решение на Java выполняется на моей машине меньше чем за секунду — неплохо!

Уже существует множество готовых решений на Go и по крайней мере одна хорошая статья. Моё решение быстрее некоторых других, но чуть медленнее, чем самое быстрое. Однако перед написанием своего я не изучал другие — хотел, чтобы мои решения были независимыми.

Если вам интересны только показатели, то перейдите в конец статьи, там есть таблица с результатами.

Отправная точка

Вот несколько исходных показателей для понимания, на что ориентироваться. Во-первых сколько времени нужно для простого считывания 13 ГБ данных при помощи cat:

$ time cat measurements.txt >/dev/null
0m1.052s

Стоит отметить, что это лучший показатель из пяти, то есть я позволил файлу кэшироваться. Кто знает, допускает ли Linux хранить все 13 ГБ в дисковом кэше; предположительно, да, потому что в первый раз это заняло почти 6 секунд.

Для сравнения, выполнение каких-то операций происходит существенно медленнее: wc занимает почти минуту:

$ time wc measurements.txt 
 1000000000  1179173106 13795293380 measurements.txt
0m55.710s

Для создания простого решения этой задачи я, вероятно, начну с AWK. В этом решении используется Gawk, потому что сортировать вывод проще функцией asorti. Я воспользовался опцией -b, чтобы применить режим «символы как байты», что немного ускоряет работу:

$ time gawk -b -f 1brc.awk measurements.txt >measurements.out
7m35.567s

Уверен, что смогу побить показатель в 7 минут даже с простым решением на Go, так что давайте начнём с этого.

Я начну с оптимизации последовательной одноядерной версии (решения 1-7), а затем распараллелю его (решения 8 и 9). Все результаты получены с Go 1.21.5 на ноутбуке с linux/amd64, быстрым SSD-накопителем и 32 ГБ ОЗУ.

Многие из моих решений и большинство самых быстрых решений предполагают, что входные данные всегда валидны. Например, что температуры имеют ровно одно значение после запятой. Многие мои решения приведут к runtime panic или к неверным результатам в случае невалидных входных данных.

Решение 1: простой и идиоматичный Go

Я хотел, чтобы первая версия была простым, незатейливым решением с использованием инструментов только из стандартной библиотеки Go: bufio.Scanner для считывания строк, strings.Cut для разбиения по ';'strconv.ParseFloat для парсинга температур и обычной map Go для накапливания результатов.

Сначала я покажу решение целиком, а потом объясню самые интересные части:

func r1(inputPath string, output io.Writer) error {
    type stats struct {
        min, max, sum float64
        count         int64
    }

    f, err := os.Open(inputPath)
    if err != nil {
        return err
    }
    defer f.Close()

    stationStats := make(map[string]stats)

    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        line := scanner.Text()
        station, tempStr, hasSemi := strings.Cut(line, ";")
        if !hasSemi {
            continue
        }

        temp, err := strconv.ParseFloat(tempStr, 64)
        if err != nil {
            return err
        }

        s, ok := stationStats[station]
        if !ok {
            s.min = temp
            s.max = temp
            s.sum = temp
            s.count = 1
        } else {
            s.min = min(s.min, temp)
            s.max = max(s.max, temp)
            s.sum += temp
            s.count++
        }
        stationStats[station] = s
    }

    stations := make([]string, 0, len(stationStats))
    for station := range stationStats {
        stations = append(stations, station)
    }
    sort.Strings(stations)

    fmt.Fprint(output, "{")
    for i, station := range stations {
        if i > 0 {
            fmt.Fprint(output, ", ")
        }
        s := stationStats[station]
        mean := s.sum / float64(s.count)
        fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max)
    }
    fmt.Fprint(output, "}\n")
    return nil
}

Это базовое решение обрабатывает один миллиард строк за 1 минуту 45 секунд. Точно лучше, чем 7 минут решения на AWK.

Решение 2: map со значениями указателей

Я обучался на создании своей программы count-words, в которой происходит чуть больше хэширования, чем нужно. В каждой строке мы хэшируем строку символов дважды: сначала когда пытаемся получить значение из map, а потом когда обновляем map.

Чтобы избежать этого, можно использовать map[string]stats (значения указателей) и обновлять адресуемую указателем struct, вместо map[string]stats и обновления самой хэш-таблицы.

Однако сначала я хотел убедиться в этом при помощи профилировщика Go. Чтобы добавить профилирование CPU к программе на Go, достаточно всего нескольких строк.

$ ./go-1brc -cpuprofile=cpu.prof -revision=1 measurements-10000000.txt >measurements-10000000.out
Processed 131.6MB in 965.888929ms
$ go tool pprof -http=: cpu.prof
...

Эти команды создали следующий профиль решения 1, пропущенного через урезанный входной файл в 10 миллионов строк:

Profile of solution r1
Профиль решения r1

Операции с map занимают целых 30% времени: 12,24% на присвоение и 17,35% на поиск. Использовав значение указателя, мы должны избавиться от основной части времени на присвоения map.

Примечание: это изображение профиля также демонстрирует, на что тратится остальная часть времени:

  • Сканирование строк при помощи Scanner.Scan

  • Поиск ';' при помощи strings.Cut

  • Парсинг температуры при помощи strconv.ParseFloat

  • Вызов Scanner.Text, распределяющего строку символов для строки файла

Как бы то ни было, моё второе решение стало лишь небольшим улучшением операций с map:

stationStats := make(map[string]*stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
    // ...
    s := stationStats[station]
    if s == nil {
        stationStats[station] = &stats{
            min:   temp,
            max:   temp,
            sum:   temp,
            count: 1,
        }
    } else {
        s.min = min(s.min, temp)
        s.max = max(s.max, temp)
        s.sum += temp
        s.count++
    }
}

В общем случае, когда станция существует в map, мы теперь выполняем только одну операцию с map, s := stationStats[station], чтобы хэширование названия станции и доступ к хэш-таблице должно было выполняться только один раз. Если оно уже есть в map (общий случай для одного миллиарда строк), то мы обновляем существующую struct, адресуемую указателем.

Это не особо помогает, но что-то даёт: использование значений указателей в map снижает время выполнения с 1 минуты 45 секунд до 1 минуты 31 секунды.

Решение 3: избегаем strconv.ParseFloat

В третьем решении всё наконец становится более хардкорным: мы будем парсить температуру при помощи собственного кода, а не через strconv.ParseFloat. Функция стандартной библиотеки обрабатывает кучу пограничных случаев, которые для простых температур входных данных нам поддерживать не нужно: у нас будет всего две или три цифры в формате 1.2 или  34.5 (и некоторые с минусом перед числом).

Кроме того, strconv.ParseFloat получает аргумент string, и теперь, когда мы её не используем, можно обойтись байтовым срезом непосредственно из Scanner.Bytes, а не распределять и копировать строку при помощи Scanner.Text.

Теперь мы парсим температуру вот так:

negative := false
index := 0
if tempBytes[index] == '-' {
    index++
    negative = true
}
temp := float64(tempBytes[index] - '0') // парсим первую цифру
index++
if tempBytes[index] != '.' {
    temp = temp*10 + float64(tempBytes[index]-'0') // парсим опциональную вторую цифру
    index++
}
index++ // skip '.'
temp += float64(tempBytes[index]-'0') / 10 // парсим десятичную цифру
if negative {
    temp = -temp
}

Не особо красиво, но и сложного ничего нет. Так мы снизили время с 1 минуты 31 секунды до менее чем минуты: 55,8 секунды.

Решение 4: целые числа с фиксированной запятой

В былые времена команды с плавающей запятой были гораздо медленнее, чем целочисленные. Сегодня они лишь немного медленнее, но, вероятно, стоит по возможности избегать их.

В нашей задаче каждая температура имеет один разряд после запятой, так что для её описания можно легко использовать целые числа с фиксированной запятой. Например, мы можем представить 34,5 как целочисленное 345. И только в самом конце, перед непосредственным выводом результатов, мы преобразуем их обратно во float.

То есть моё четвёртое решение, по сути, такое же, как и решение 3, но со следующим полем struct stats:

type stats struct {
    min, max, count int32
    sum             int64
}

Перед выводом результатов его нужно разделить на 10:

mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f",
    station, float64(s.min)/10, mean, float64(s.max)/10)

Для минимальных и максимальных температур я использовал 32-битные integer, так как максимум, вероятно, будет примерно 500 (50 градусов Цельсия). Можно использовать int16, но из прежнего опыта я сделал вывод, что современные 64-битные CPU чуть медленнее работают с 16-битными integer, чем с 32-битными. В моих тестах они не показали какой-то значимой разницы, но я всё равно выбрал 32-битные.

Использование integer снизило время с 55,8 секунды до 51,0 секунды — небольшой выигрыш.

Решение 5: избегаем bytes.Cut

Чтобы создать решение 5, я записал ещё один профиль (решения 4):

Profile of solution r4
Профиль решения r4

Итак, всё стало сложнее. Операции map доминируют, и переход к специализированной хэш-таблице может оказаться немного неочевидным. Поэтому мы избавимся от bufio.Scanner. Давайте попрокрастинируем и избавимся от bytes.Cut.

Я подумал, что это простой способ экономии времени. Взгляните на этот пример строки:

New Orleans;11.7

Будет быстрее парсить температуру с конца и находить ';' там, чем сканировать всё название станции в поисках ';'. Этот довольно уродливый код делает именно это:

end := len(line)
tenths := int32(line[end-1] - '0')
ones := int32(line[end-3] - '0') // line[end-2] is '.'
var temp int32
var semicolon int
if line[end-4] == ';' {          // положительная температура N.N
    temp = ones*10 + tenths
    semicolon = end - 4
} else if line[end-4] == '-' {   // отрицательная температура -N.N
    temp = -(ones*10 + tenths)
    semicolon = end - 5
} else {
    tens := int32(line[end-4] - '0')
    if line[end-5] == ';' {      // положительная температура NN.N
        temp = tens*100 + ones*10 + tenths
        semicolon = end - 5
    } else {                     // отрицательная температура -NN.N
        temp = -(tens*100 + ones*10 + tenths)
        semicolon = end - 6
    }
}
station := line[:semicolon]

Отказавшись от bytes.Cut, мы снизили время с 51,0 секунды до 46,0 секунды — ещё одна маленькая победа.

Решение 6: избегаем bufio.Scanner

Теперь мы попробуем избавиться от bufio.Scanner. Задумайтесь: чтобы найти конец каждой строки файла, сканеру приходится обходить все байты в поисках символа переноса строки. Затем мы снова обрабатываем множество байтов, чтобы спарсить температуру и найти ';'. Так что давайте попробуем объединить эти этапы и избавимся от bufio.Scanner.

В решении 6 мы распределяем буфер на 1 МБ для чтения файла большими блоками, ищем последний символ переноса строки в блоке, чтобы убедиться, что мы не разрезаем строку пополам, а затем обрабатываем каждый блок. Это выглядит так:

buf := make([]byte, 1024*1024)
readStart := 0
for {
    n, err := f.Read(buf[readStart:])
    if err != nil && err != io.EOF {
        return err
    }
    if readStart+n == 0 {
        break
    }
    chunk := buf[:readStart+n]

    newline := bytes.LastIndexByte(chunk, '\n')
    if newline < 0 {
        break
    }
    remaining := chunk[newline+1:]
    chunk = chunk[:newline+1]

    for {
        station, after, hasSemi := bytes.Cut(chunk, []byte(";"))
        // ... дальше та же обработка температуры, что и в r4 ...

Устранениеbufio.Scanner и выполнение собственного сканирования снизило время с 46,0 секунды до 41,3 секунды. Ещё одна крошечная победа, но мы примем её.

Решение 7: специальная хэш-таблица

На решении 7 шутки заканчиваются. Мы реализуем собственную хэш-таблицу вместо map Go. У этого подхода есть два преимущества:

  1. Мы сможем хэшировать название станции в процессе поиска ';', избегая повторной обработки байтов.

  2. Мы можем хранить каждый ключ в нашей хэш-таблице как байтовый срез, избегая необходимости преобразовывать каждый ключ в string (при этом выполняется распределение и копирование для каждой строки файла).

Я писал о том, как реализовать хэш-таблицу на C, но также я реализовал и собственную «счётную» хэш-таблицу на Go, откуда и взял эту реализацию.

Это простая реализация, использующая алгоритм хэширования FNV-1a с линейным зондированием: если возникает коллизия, задействуется следующий пустой слот.

Чтобы упростить, я заранее распределяю большое количество хэш-бакетов (я использовал 100000), чтобы избежать необходимости написания логики изменения размера страницы. Если таблица заполнится больше, чем наполовину, у кода произойдёт panic. Я замерил, что мы получим примерно 2% коллизий хэшей.

На этот раз кода гораздо больше — подготовка хэш‑таблицы, само хэширование, зондирование таблицы и вставка:

// Структура хэш-таблицы:
type item struct {
    key  []byte
    stat *stats
}
items := make([]item, 100000) // хэш-бакеты с линейным зондированием
size := 0                     // количество активных элементов в срезе элементов

buf := make([]byte, 1024*1024)
readStart := 0
for {
    // ... то же разбиение на блоки, что и в r6 ...

    for {
        const (
            // 64-битные константы FNV-1 из hash/fnv.
            offset64 = 14695981039346656037
            prime64  = 1099511628211
        )

        // Хэшируем название станции и ищем ';'.
        var station, after []byte
        hash := uint64(offset64)
        i := 0
        for ; i < len(chunk); i++ {
            c := chunk[i]
            if c == ';' {
                station = chunk[:i]
                after = chunk[i+1:]
                break
            }
            hash ^= uint64(c) // FNV-1a is XOR then *
            hash *= prime64
        }
        if i == len(chunk) {
            break
        }

        // ... тот же парсинг температур, что и в r6 ...

        // Переходим к нужному бакету в хэш-таблице.
        hashIndex := int(hash & uint64(len(items)-1))
        for {
            if items[hashIndex].key == nil {
                // Найден пустой слот, добавляем новый элемент (копируем ключ).
                key := make([]byte, len(station))
                copy(key, station)
                items[hashIndex] = item{
                    key: key,
                    stat: &stats{
                        min:   temp,
                        max:   temp,
                        sum:   int64(temp),
                        count: 1,
                    },
                }
                size++
                if size > len(items)/2 {
                    panic("too many items in hash table")
                }
                break
            }
            if bytes.Equal(items[hashIndex].key, station) {
                // Найден совпадающий слот, прибавляем к имеющейся статистике.
                s := items[hashIndex].stat
                s.min = min(s.min, temp)
                s.max = max(s.max, temp)
                s.sum += int64(temp)
                s.count++
                break
            }
            // Слот уже содержит другой ключ, пробуем следующий слот (линейное зондирование).
            hashIndex++
            if hashIndex >= len(items) {
                hashIndex = 0
            }
        }
    }

    readStart = copy(buf, remaining)
}

Отдача от всего этого кода велика: специальная хэш-таблица снижает время с 41,3 до 25,8 секунды.

Решение 8: параллельная обработка блоков

В решении 8 я хотел добавить параллелизм. Однако я решил вернуться к простому и идиоматичному коду из первого решения, использующему bufio.Scanner и strconv.ParseFloat, распараллелив его. Так мы увидим, что даёт лучшие результаты, оптимизация или параллелизация, а в девятом решении реализуем и то, и другое.

Задачу map-reduce параллелизировать очень легко: разбить файл на блоки схожего размера (по одному на каждое ядро CPU), запустить поток (в Go горутину) для обработки каждого блока, а в конце объединить результаты.

Вот, как это выглядит на высоком уровне:

// Определяем непересекающиеся части для разбиения файла (каждая часть имеет смещение и размер).
parts, err := splitFile(inputPath, maxGoroutines)
if err != nil {
    return err
}

// Запускаем горутину для обработки каждой части, возвращая результаты в канал.
resultsCh := make(chan map[string]r8Stats)
for _, part := range parts {
    go r8ProcessPart(inputPath, part.offset, part.size, resultsCh)
}

// Ждём возврата результатов и агрегируем их.
totals := make(map[string]r8Stats)
for i := 0; i < len(parts); i++ {
    result := <-resultsCh
    for station, s := range result {
        ts, ok := totals[station]
        if !ok {
            totals[station] = r8Stats{
                min:   s.min,
                max:   s.max,
                sum:   s.sum,
                count: s.count,
            }
            continue
        }
        ts.min = min(ts.min, s.min)
        ts.max = max(ts.max, s.max)
        ts.sum += s.sum
        ts.count += s.count
        totals[station] = ts
    }
}

Функция splitFile довольно скучна, поэтому я не стал её сюда включать. Она смотрит на размер файла, делит его на нужное нам количество частей, а затем ищет каждую часть, считывая 100 байтов перед концом и находя последний символ переноса строки, чтобы гарантировать, что каждая часть заканчивается полной строкой файла.

Функция r8ProcessPart, по сути, такая же, как и в решении r1, но она начинает с перехода к смещению части и ограничивает длину размером части (при помощи io.LimitedReader). Закончив, она отправляет собственную map статистики обратно в канал:

func r8ProcessPart(inputPath string, fileOffset, fileSize int64,
                   resultsCh chan map[string]r8Stats) {
    file, err := os.Open(inputPath)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    _, err = file.Seek(fileOffset, io.SeekStart)
    if err != nil {
        panic(err)
    }
    f := io.LimitedReader{R: file, N: fileSize}

    stationStats := make(map[string]r8Stats)

    scanner := bufio.NewScanner(&f)
    for scanner.Scan() {
        // ... та же обработка, что и в r1 ...
    }

    resultsCh <- stationStats
}

Параллельная обработка входного файла обеспечивает существенный выигрыш по сравнению с r1, снижая время с 1 минуты 45 секунд до 24,3 секунды. Для сравнения: предыдущая «оптимизированная непараллельная» версия (решение 7) занимала 25,8 секунды. То есть в этом случае параллелизация чуть быстрее оптимизации, к тому же намного проще.

Решение 9: все оптимизации плюс параллелизация

В решении 9, нашей последней попытке, мы просто соединим все оптимизации с r1 по r7 с параллелизацией, реализованной в r8.

Я использовал ту же функцию splitFile из r8, а остальную часть кода просто скопировал из r7, поэтому здесь нет ничего нового. За исключением результатов: эта окончательная версия снизила время с 24,3 секунды до 3,99 секунды — огромная победа.

Любопытно, что поскольку вся реальная обработка теперь находится в одной большой функции r9ProcessPart, граф профиля больше не особо полезен. Вот как он теперь выглядит:

Profile of solution r9
Профиль решения r9

Как видите, 82% времени тратится на r9ProcessPart, bytes.Equal занимает 13%, а считывание файла занимает оставшиеся 5%.

Если мы хотим добиться более подробного профилирования, нам нужно уйти глубже, чем уровень функций, который нам показывает режим графа, и использовать режим исходников. Вот внутренний цикл:

Profile of solution r9 - source view
Profile of solution r9 — source view

Меня этот отчёт сбил с толку. Почему в нём показано, что if items[hashIndex].key == nil занимает 5,01 с, но у вызова bytes.Equal показано всего 390 мс? Поиск среза ведь гораздо менее затратен, чем вызов функции? Если вы специалист по производительности Go и поможете мне это проинтерпретировать, то напишите!

Как бы то ни было, я уверен, что можно придумать и более безумные оптимизации, но я решил остановиться на этом. Для меня обработка миллиарда строк за 4 секунды, или по 250 миллиона строк в секунду — это вполне достаточно.

Таблица результатов

Ниже приведена таблица со всеми моими решениями на Go, а также с самыми быстрыми решениями на Go и на Java. Каждый результат — лучший из пяти прогонов решения с одинаковым входным файлом из миллиарда строк.

Версия

Описание

Время

Время относительно r1

r1

простое и идиоматичное

1 мин 45

1,00

r2

map со значениями указателей

1 мин 31

1,15

r3

парсинг температур вручную

55,8 с

1,87

r4

целые числа с фиксированной запятой

51,0 с

2,05

r5

избавляемся от bytes.Cut

46,0 с

2,27

r6

избавляемся от bufio.Scanner

41,3 с

2,53

r7

специальная хэш-таблица

25,8 с

4,05

r8

распараллеленное r1

24,3 с

4,31

r9

распараллеленное r7

3,99 с

26,2

AY

самая быстрая версия на Go

2,90 с

36,2

TW

самая быстрая версия на Java

0,953 с

110

Я приблизительно в том же интервале, что и версия на Go Александра Ястребова (AY). Его решение похоже на моё: разбиваем файл на блоки, используем специальную хэш-таблицу (он даже как и я использовал хэширование FNV) и парсим температуру как integer. Однако он использует файлы с отображением в память, от которых я отказался из соображений портируемости. Вероятно, поэтому его версия немного быстрее.

Томас Вюртенгер (при помощи других разработчиков) создал в исходном челлендже на Java самое быстрое решение. На моей машине оно выполняется меньше, чем за секунду, в четыре раза быстрее, чем моя версия на Go. Похоже, что наряду с параллельной обработкой и отображением файлов в память он использовал развёрнутые циклы, код парсинга без ветвления и другие низкоуровневые трюки.

Кажется, Томас — основатель и важный контрибьютор в GraalVM (быстрой Java Virtual Machine с компиляцией перед исполнением). Так что он определённо специалист в этой области. Отличная работа, Томас!

Последние комментарии

Почему всё это важно?

В случае большинства повседневных программных задач лучше всего начинать с простого и идиоматичного кода. Если вы вычисляете статистику для миллиарда температур, а ответ нужен лишь один раз, то 1 минуты 45 секунд, вам, вероятно, будет достаточно.

Но если вы создаёте конвейер обработки данных и сможете ускорить код в четыре, а то и в 26 раз, то вы не только порадуете пользователей, но и сможете серьёзно сэкономить в затратах на вычислительные ресурсы — если система хорошо загружена, то затраты на вычисления могут быть в 4 или 26 раза меньше исходных!

А если вы создаёте среду исполнения наподобие GraalVM или интерпретатор наподобие GoAWK, то такой уровень производительности очень важен: если вы ускорите интерпретатор, то и программы пользователей тоже будут выполняться гораздо быстрее.

Кроме того, просто интересно писать код, выжимающий из машины максимум.