Челлендж по обработке миллиарда строк на Go: от 1 минуты 45 секунд до 4 секунд
- пятница, 8 марта 2024 г. в 00:00:16
Пару недель назад я прочитал о запавшем мне в душу челлендже по обработке миллиарда строк, поэтому захотел решить его на Go.
Я немного опоздал, соревнования проводились в январе. И на Java. Меня не особо интересует Java, зато давно интересует оптимизация кода на Go.
Этот челлендж был очень прост: обработать текстовый файл названий метеорологических станций и температур, и для каждой станции вывести минимальное, среднее и максимальное значение. Чтобы упростить задачу, было ещё несколько ограничений, однако я проигнорировал те, что относятся только к Java.
Вот несколько строк с примером входных данных:
Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
...
Единственная тонкость заключается в том, что входной файл состоит из миллиарда строк. Это примерно 13 ГБ данных. Я уже разобрался, что дисковый ввод-вывод не становится узким местом, обычно замедляют подобные программы распределения памяти и парсинг.
В этой статье описывается девять написанных мной на Go решений, каждое из которых быстрее предыдущего. Первое, простое и идиоматичное, выполняется на моей машине 1 минуту 45 секунд, а последнее — примерно 4 секунды. По ходу дела я буду показывать, как использовал профилировщик Go, чтобы понимать, на что тратится время.
Вот список решений, от самого медленного до самого быстрого:
r1: простое и идиоматичное
r2: map со значениями указателей
r3: ручной парсинг температур
r4: целые числа с фиксированной запятой
r5: избегаем bytes.Cut
r6: избегаем bufio.Scanner
r7: специальная хэш-таблица
r8: распараллеленное r1
r9: распараллеленное r7
Я хотел, чтобы каждое из решений было портируемым кодом на Go, использующим только стандартную библиотеку: никакого ассемблера, никаких unsafe
и никаких файлов с отображением в память. Для меня 4 секунды, или 3,2 ГБ/с показались достаточно быстрым результатом. Для сравнения: самое быстрое, сильно оптимизированное решение на Java выполняется на моей машине меньше чем за секунду — неплохо!
Уже существует множество готовых решений на Go и по крайней мере одна хорошая статья. Моё решение быстрее некоторых других, но чуть медленнее, чем самое быстрое. Однако перед написанием своего я не изучал другие — хотел, чтобы мои решения были независимыми.
Если вам интересны только показатели, то перейдите в конец статьи, там есть таблица с результатами.
Вот несколько исходных показателей для понимания, на что ориентироваться. Во-первых сколько времени нужно для простого считывания 13 ГБ данных при помощи cat
:
$ time cat measurements.txt >/dev/null
0m1.052s
Стоит отметить, что это лучший показатель из пяти, то есть я позволил файлу кэшироваться. Кто знает, допускает ли Linux хранить все 13 ГБ в дисковом кэше; предположительно, да, потому что в первый раз это заняло почти 6 секунд.
Для сравнения, выполнение каких-то операций происходит существенно медленнее: wc
занимает почти минуту:
$ time wc measurements.txt
1000000000 1179173106 13795293380 measurements.txt
0m55.710s
Для создания простого решения этой задачи я, вероятно, начну с AWK. В этом решении используется Gawk, потому что сортировать вывод проще функцией asorti
. Я воспользовался опцией -b
, чтобы применить режим «символы как байты», что немного ускоряет работу:
$ time gawk -b -f 1brc.awk measurements.txt >measurements.out
7m35.567s
Уверен, что смогу побить показатель в 7 минут даже с простым решением на Go, так что давайте начнём с этого.
Я начну с оптимизации последовательной одноядерной версии (решения 1-7), а затем распараллелю его (решения 8 и 9). Все результаты получены с Go 1.21.5 на ноутбуке с linux/amd64, быстрым SSD-накопителем и 32 ГБ ОЗУ.
Многие из моих решений и большинство самых быстрых решений предполагают, что входные данные всегда валидны. Например, что температуры имеют ровно одно значение после запятой. Многие мои решения приведут к runtime panic или к неверным результатам в случае невалидных входных данных.
Я хотел, чтобы первая версия была простым, незатейливым решением с использованием инструментов только из стандартной библиотеки Go: bufio.Scanner
для считывания строк, strings.Cut
для разбиения по ';'
, strconv.ParseFloat
для парсинга температур и обычной map
Go для накапливания результатов.
Сначала я покажу решение целиком, а потом объясню самые интересные части:
func r1(inputPath string, output io.Writer) error {
type stats struct {
min, max, sum float64
count int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
station, tempStr, hasSemi := strings.Cut(line, ";")
if !hasSemi {
continue
}
temp, err := strconv.ParseFloat(tempStr, 64)
if err != nil {
return err
}
s, ok := stationStats[station]
if !ok {
s.min = temp
s.max = temp
s.sum = temp
s.count = 1
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += temp
s.count++
}
stationStats[station] = s
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := s.sum / float64(s.count)
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max)
}
fmt.Fprint(output, "}\n")
return nil
}
Это базовое решение обрабатывает один миллиард строк за 1 минуту 45 секунд. Точно лучше, чем 7 минут решения на AWK.
Я обучался на создании своей программы count-words, в которой происходит чуть больше хэширования, чем нужно. В каждой строке мы хэшируем строку символов дважды: сначала когда пытаемся получить значение из map, а потом когда обновляем map.
Чтобы избежать этого, можно использовать map[string]stats
(значения указателей) и обновлять адресуемую указателем struct, вместо map[string]stats
и обновления самой хэш-таблицы.
Однако сначала я хотел убедиться в этом при помощи профилировщика Go. Чтобы добавить профилирование CPU к программе на Go, достаточно всего нескольких строк.
$ ./go-1brc -cpuprofile=cpu.prof -revision=1 measurements-10000000.txt >measurements-10000000.out
Processed 131.6MB in 965.888929ms
$ go tool pprof -http=: cpu.prof
...
Эти команды создали следующий профиль решения 1, пропущенного через урезанный входной файл в 10 миллионов строк:
Операции с map занимают целых 30% времени: 12,24% на присвоение и 17,35% на поиск. Использовав значение указателя, мы должны избавиться от основной части времени на присвоения map.
Примечание: это изображение профиля также демонстрирует, на что тратится остальная часть времени:
Сканирование строк при помощи Scanner.Scan
Поиск ';'
при помощи strings.Cut
Парсинг температуры при помощи strconv.ParseFloat
Вызов Scanner.Text
, распределяющего строку символов для строки файла
Как бы то ни было, моё второе решение стало лишь небольшим улучшением операций с map:
stationStats := make(map[string]*stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
// ...
s := stationStats[station]
if s == nil {
stationStats[station] = &stats{
min: temp,
max: temp,
sum: temp,
count: 1,
}
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += temp
s.count++
}
}
В общем случае, когда станция существует в map, мы теперь выполняем только одну операцию с map, s := stationStats[station]
, чтобы хэширование названия станции и доступ к хэш-таблице должно было выполняться только один раз. Если оно уже есть в map (общий случай для одного миллиарда строк), то мы обновляем существующую struct, адресуемую указателем.
Это не особо помогает, но что-то даёт: использование значений указателей в map снижает время выполнения с 1 минуты 45 секунд до 1 минуты 31 секунды.
В третьем решении всё наконец становится более хардкорным: мы будем парсить температуру при помощи собственного кода, а не через strconv.ParseFloat
. Функция стандартной библиотеки обрабатывает кучу пограничных случаев, которые для простых температур входных данных нам поддерживать не нужно: у нас будет всего две или три цифры в формате 1.2
или 34.5
(и некоторые с минусом перед числом).
Кроме того, strconv.ParseFloat
получает аргумент string
, и теперь, когда мы её не используем, можно обойтись байтовым срезом непосредственно из Scanner.Bytes
, а не распределять и копировать строку при помощи Scanner.Text
.
Теперь мы парсим температуру вот так:
negative := false
index := 0
if tempBytes[index] == '-' {
index++
negative = true
}
temp := float64(tempBytes[index] - '0') // парсим первую цифру
index++
if tempBytes[index] != '.' {
temp = temp*10 + float64(tempBytes[index]-'0') // парсим опциональную вторую цифру
index++
}
index++ // skip '.'
temp += float64(tempBytes[index]-'0') / 10 // парсим десятичную цифру
if negative {
temp = -temp
}
Не особо красиво, но и сложного ничего нет. Так мы снизили время с 1 минуты 31 секунды до менее чем минуты: 55,8 секунды.
В былые времена команды с плавающей запятой были гораздо медленнее, чем целочисленные. Сегодня они лишь немного медленнее, но, вероятно, стоит по возможности избегать их.
В нашей задаче каждая температура имеет один разряд после запятой, так что для её описания можно легко использовать целые числа с фиксированной запятой. Например, мы можем представить 34,5 как целочисленное 345. И только в самом конце, перед непосредственным выводом результатов, мы преобразуем их обратно во float.
То есть моё четвёртое решение, по сути, такое же, как и решение 3, но со следующим полем struct stats
:
type stats struct {
min, max, count int32
sum int64
}
Перед выводом результатов его нужно разделить на 10:
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f",
station, float64(s.min)/10, mean, float64(s.max)/10)
Для минимальных и максимальных температур я использовал 32-битные integer, так как максимум, вероятно, будет примерно 500 (50 градусов Цельсия). Можно использовать int16
, но из прежнего опыта я сделал вывод, что современные 64-битные CPU чуть медленнее работают с 16-битными integer, чем с 32-битными. В моих тестах они не показали какой-то значимой разницы, но я всё равно выбрал 32-битные.
Использование integer снизило время с 55,8 секунды до 51,0 секунды — небольшой выигрыш.
Чтобы создать решение 5, я записал ещё один профиль (решения 4):
Итак, всё стало сложнее. Операции map доминируют, и переход к специализированной хэш-таблице может оказаться немного неочевидным. Поэтому мы избавимся от bufio.Scanner
. Давайте попрокрастинируем и избавимся от bytes.Cut
.
Я подумал, что это простой способ экономии времени. Взгляните на этот пример строки:
New Orleans;11.7
Будет быстрее парсить температуру с конца и находить ';'
там, чем сканировать всё название станции в поисках ';'
. Этот довольно уродливый код делает именно это:
end := len(line)
tenths := int32(line[end-1] - '0')
ones := int32(line[end-3] - '0') // line[end-2] is '.'
var temp int32
var semicolon int
if line[end-4] == ';' { // положительная температура N.N
temp = ones*10 + tenths
semicolon = end - 4
} else if line[end-4] == '-' { // отрицательная температура -N.N
temp = -(ones*10 + tenths)
semicolon = end - 5
} else {
tens := int32(line[end-4] - '0')
if line[end-5] == ';' { // положительная температура NN.N
temp = tens*100 + ones*10 + tenths
semicolon = end - 5
} else { // отрицательная температура -NN.N
temp = -(tens*100 + ones*10 + tenths)
semicolon = end - 6
}
}
station := line[:semicolon]
Отказавшись от bytes.Cut
, мы снизили время с 51,0 секунды до 46,0 секунды — ещё одна маленькая победа.
Теперь мы попробуем избавиться от bufio.Scanner
. Задумайтесь: чтобы найти конец каждой строки файла, сканеру приходится обходить все байты в поисках символа переноса строки. Затем мы снова обрабатываем множество байтов, чтобы спарсить температуру и найти ';'
. Так что давайте попробуем объединить эти этапы и избавимся от bufio.Scanner
.
В решении 6 мы распределяем буфер на 1 МБ для чтения файла большими блоками, ищем последний символ переноса строки в блоке, чтобы убедиться, что мы не разрезаем строку пополам, а затем обрабатываем каждый блок. Это выглядит так:
buf := make([]byte, 1024*1024)
readStart := 0
for {
n, err := f.Read(buf[readStart:])
if err != nil && err != io.EOF {
return err
}
if readStart+n == 0 {
break
}
chunk := buf[:readStart+n]
newline := bytes.LastIndexByte(chunk, '\n')
if newline < 0 {
break
}
remaining := chunk[newline+1:]
chunk = chunk[:newline+1]
for {
station, after, hasSemi := bytes.Cut(chunk, []byte(";"))
// ... дальше та же обработка температуры, что и в r4 ...
Устранениеbufio.Scanner
и выполнение собственного сканирования снизило время с 46,0 секунды до 41,3 секунды. Ещё одна крошечная победа, но мы примем её.
На решении 7 шутки заканчиваются. Мы реализуем собственную хэш-таблицу вместо map
Go. У этого подхода есть два преимущества:
Мы сможем хэшировать название станции в процессе поиска ';'
, избегая повторной обработки байтов.
Мы можем хранить каждый ключ в нашей хэш-таблице как байтовый срез, избегая необходимости преобразовывать каждый ключ в string
(при этом выполняется распределение и копирование для каждой строки файла).
Я писал о том, как реализовать хэш-таблицу на C, но также я реализовал и собственную «счётную» хэш-таблицу на Go, откуда и взял эту реализацию.
Это простая реализация, использующая алгоритм хэширования FNV-1a с линейным зондированием: если возникает коллизия, задействуется следующий пустой слот.
Чтобы упростить, я заранее распределяю большое количество хэш-бакетов (я использовал 100000), чтобы избежать необходимости написания логики изменения размера страницы. Если таблица заполнится больше, чем наполовину, у кода произойдёт panic. Я замерил, что мы получим примерно 2% коллизий хэшей.
На этот раз кода гораздо больше — подготовка хэш‑таблицы, само хэширование, зондирование таблицы и вставка:
// Структура хэш-таблицы:
type item struct {
key []byte
stat *stats
}
items := make([]item, 100000) // хэш-бакеты с линейным зондированием
size := 0 // количество активных элементов в срезе элементов
buf := make([]byte, 1024*1024)
readStart := 0
for {
// ... то же разбиение на блоки, что и в r6 ...
for {
const (
// 64-битные константы FNV-1 из hash/fnv.
offset64 = 14695981039346656037
prime64 = 1099511628211
)
// Хэшируем название станции и ищем ';'.
var station, after []byte
hash := uint64(offset64)
i := 0
for ; i < len(chunk); i++ {
c := chunk[i]
if c == ';' {
station = chunk[:i]
after = chunk[i+1:]
break
}
hash ^= uint64(c) // FNV-1a is XOR then *
hash *= prime64
}
if i == len(chunk) {
break
}
// ... тот же парсинг температур, что и в r6 ...
// Переходим к нужному бакету в хэш-таблице.
hashIndex := int(hash & uint64(len(items)-1))
for {
if items[hashIndex].key == nil {
// Найден пустой слот, добавляем новый элемент (копируем ключ).
key := make([]byte, len(station))
copy(key, station)
items[hashIndex] = item{
key: key,
stat: &stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
},
}
size++
if size > len(items)/2 {
panic("too many items in hash table")
}
break
}
if bytes.Equal(items[hashIndex].key, station) {
// Найден совпадающий слот, прибавляем к имеющейся статистике.
s := items[hashIndex].stat
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
break
}
// Слот уже содержит другой ключ, пробуем следующий слот (линейное зондирование).
hashIndex++
if hashIndex >= len(items) {
hashIndex = 0
}
}
}
readStart = copy(buf, remaining)
}
Отдача от всего этого кода велика: специальная хэш-таблица снижает время с 41,3 до 25,8 секунды.
В решении 8 я хотел добавить параллелизм. Однако я решил вернуться к простому и идиоматичному коду из первого решения, использующему bufio.Scanner
и strconv.ParseFloat
, распараллелив его. Так мы увидим, что даёт лучшие результаты, оптимизация или параллелизация, а в девятом решении реализуем и то, и другое.
Задачу map-reduce параллелизировать очень легко: разбить файл на блоки схожего размера (по одному на каждое ядро CPU), запустить поток (в Go горутину) для обработки каждого блока, а в конце объединить результаты.
Вот, как это выглядит на высоком уровне:
// Определяем непересекающиеся части для разбиения файла (каждая часть имеет смещение и размер).
parts, err := splitFile(inputPath, maxGoroutines)
if err != nil {
return err
}
// Запускаем горутину для обработки каждой части, возвращая результаты в канал.
resultsCh := make(chan map[string]r8Stats)
for _, part := range parts {
go r8ProcessPart(inputPath, part.offset, part.size, resultsCh)
}
// Ждём возврата результатов и агрегируем их.
totals := make(map[string]r8Stats)
for i := 0; i < len(parts); i++ {
result := <-resultsCh
for station, s := range result {
ts, ok := totals[station]
if !ok {
totals[station] = r8Stats{
min: s.min,
max: s.max,
sum: s.sum,
count: s.count,
}
continue
}
ts.min = min(ts.min, s.min)
ts.max = max(ts.max, s.max)
ts.sum += s.sum
ts.count += s.count
totals[station] = ts
}
}
Функция splitFile
довольно скучна, поэтому я не стал её сюда включать. Она смотрит на размер файла, делит его на нужное нам количество частей, а затем ищет каждую часть, считывая 100 байтов перед концом и находя последний символ переноса строки, чтобы гарантировать, что каждая часть заканчивается полной строкой файла.
Функция r8ProcessPart
, по сути, такая же, как и в решении r1, но она начинает с перехода к смещению части и ограничивает длину размером части (при помощи io.LimitedReader
). Закончив, она отправляет собственную map статистики обратно в канал:
func r8ProcessPart(inputPath string, fileOffset, fileSize int64,
resultsCh chan map[string]r8Stats) {
file, err := os.Open(inputPath)
if err != nil {
panic(err)
}
defer file.Close()
_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
panic(err)
}
f := io.LimitedReader{R: file, N: fileSize}
stationStats := make(map[string]r8Stats)
scanner := bufio.NewScanner(&f)
for scanner.Scan() {
// ... та же обработка, что и в r1 ...
}
resultsCh <- stationStats
}
Параллельная обработка входного файла обеспечивает существенный выигрыш по сравнению с r1, снижая время с 1 минуты 45 секунд до 24,3 секунды. Для сравнения: предыдущая «оптимизированная непараллельная» версия (решение 7) занимала 25,8 секунды. То есть в этом случае параллелизация чуть быстрее оптимизации, к тому же намного проще.
В решении 9, нашей последней попытке, мы просто соединим все оптимизации с r1 по r7 с параллелизацией, реализованной в r8.
Я использовал ту же функцию splitFile
из r8, а остальную часть кода просто скопировал из r7, поэтому здесь нет ничего нового. За исключением результатов: эта окончательная версия снизила время с 24,3 секунды до 3,99 секунды — огромная победа.
Любопытно, что поскольку вся реальная обработка теперь находится в одной большой функции r9ProcessPart
, граф профиля больше не особо полезен. Вот как он теперь выглядит:
Как видите, 82% времени тратится на r9ProcessPart
, bytes.Equal
занимает 13%, а считывание файла занимает оставшиеся 5%.
Если мы хотим добиться более подробного профилирования, нам нужно уйти глубже, чем уровень функций, который нам показывает режим графа, и использовать режим исходников. Вот внутренний цикл:
Меня этот отчёт сбил с толку. Почему в нём показано, что if items[hashIndex].key == nil
занимает 5,01 с, но у вызова bytes.Equal
показано всего 390 мс? Поиск среза ведь гораздо менее затратен, чем вызов функции? Если вы специалист по производительности Go и поможете мне это проинтерпретировать, то напишите!
Как бы то ни было, я уверен, что можно придумать и более безумные оптимизации, но я решил остановиться на этом. Для меня обработка миллиарда строк за 4 секунды, или по 250 миллиона строк в секунду — это вполне достаточно.
Ниже приведена таблица со всеми моими решениями на Go, а также с самыми быстрыми решениями на Go и на Java. Каждый результат — лучший из пяти прогонов решения с одинаковым входным файлом из миллиарда строк.
Версия | Описание | Время | Время относительно r1 |
---|---|---|---|
r1 | простое и идиоматичное | 1 мин 45 | 1,00 |
r2 | map со значениями указателей | 1 мин 31 | 1,15 |
r3 | парсинг температур вручную | 55,8 с | 1,87 |
r4 | целые числа с фиксированной запятой | 51,0 с | 2,05 |
r5 | избавляемся от | 46,0 с | 2,27 |
r6 | избавляемся от | 41,3 с | 2,53 |
r7 | специальная хэш-таблица | 25,8 с | 4,05 |
r8 | распараллеленное r1 | 24,3 с | 4,31 |
r9 | распараллеленное r7 | 3,99 с | 26,2 |
самая быстрая версия на Go | 2,90 с | 36,2 | |
самая быстрая версия на Java | 0,953 с | 110 |
Я приблизительно в том же интервале, что и версия на Go Александра Ястребова (AY). Его решение похоже на моё: разбиваем файл на блоки, используем специальную хэш-таблицу (он даже как и я использовал хэширование FNV) и парсим температуру как integer. Однако он использует файлы с отображением в память, от которых я отказался из соображений портируемости. Вероятно, поэтому его версия немного быстрее.
Томас Вюртенгер (при помощи других разработчиков) создал в исходном челлендже на Java самое быстрое решение. На моей машине оно выполняется меньше, чем за секунду, в четыре раза быстрее, чем моя версия на Go. Похоже, что наряду с параллельной обработкой и отображением файлов в память он использовал развёрнутые циклы, код парсинга без ветвления и другие низкоуровневые трюки.
Кажется, Томас — основатель и важный контрибьютор в GraalVM (быстрой Java Virtual Machine с компиляцией перед исполнением). Так что он определённо специалист в этой области. Отличная работа, Томас!
Почему всё это важно?
В случае большинства повседневных программных задач лучше всего начинать с простого и идиоматичного кода. Если вы вычисляете статистику для миллиарда температур, а ответ нужен лишь один раз, то 1 минуты 45 секунд, вам, вероятно, будет достаточно.
Но если вы создаёте конвейер обработки данных и сможете ускорить код в четыре, а то и в 26 раз, то вы не только порадуете пользователей, но и сможете серьёзно сэкономить в затратах на вычислительные ресурсы — если система хорошо загружена, то затраты на вычисления могут быть в 4 или 26 раза меньше исходных!
А если вы создаёте среду исполнения наподобие GraalVM или интерпретатор наподобие GoAWK, то такой уровень производительности очень важен: если вы ускорите интерпретатор, то и программы пользователей тоже будут выполняться гораздо быстрее.
Кроме того, просто интересно писать код, выжимающий из машины максимум.