golang

MapReduce на Go: превратите ваши большие данные в понятную карту и удобный редьюс

  • пятница, 19 июля 2024 г. в 00:00:07
https://habr.com/ru/companies/otus/articles/828672/

Привет, Хабр!

Часто задается вопрос: как эффективно и быстро обработать огромные объемы информации? Ответом на этот вызов стала концепция MapReduce, разработанная в недрах Google.

MapReduce — это парадигма программирования, созданная для обработки и генерации больших объемов данных с использованием параллельных распределенных алгоритмов. Основная фича проста: сначала данные разбиваются на небольшие части (фаза Map), а затем результаты этих частей агрегируются в финальный результат (фаза Reduce).

Зачем?

  1. Масштабируемость: MapReduce позволяет распределять задачи на множество узлов, что значительно ускоряет обработку больших данных.

  2. Производительность: Параллельное выполнение задач маппинга и редьюсинга обеспечивает порой очень высокую скорость обработки.

  3. Устойчивость к ошибкам: Встроенные механизмы MapReduce обеспечивают восстановление после сбоев, что плюсик к надежности.

  4. Простота использования: Разработчику нужно лишь определить функции Map и Reduce, а остальное берет на себя фреймворк.

В статье рассмотрим как реализовать MapReduce на Go, какие оптимизации можно применить для улучшения производительности и приведем примеры использования.

MapReduce

Архитектура

  1. Mapper

    • Маппер отвечает за обработку входных данных и преобразование их в промежуточные пары ключ-значение. На этапе маппинга входные данные разбиваются на более мелкие части, что позволяет их обрабатывать параллельно.

    • Каждый маппер получает часть входных данных, выполняет над ними определенные операции (например, разбиение текста на слова) и выдает пары ключ-значение (например, слово и количество его вхождений). Благодаря тому, что мапперы работают независимо друг от друга, этот этап легко масштабируется на большое количество узлов.

  2. Reducer

    • Редьюсер собирает промежуточные пары ключ-значение, сгруппированные по ключам, и выполняет над ними завершающие операции, такие как суммирование или среднее арифметическое.

    • Редьюсер получает все значения, ассоциированные с каждым уникальным ключом, и производит конечные результаты обработки (например, общее количество вхождений каждого слова). Как и мапперы, редьюсеры работают параллельно, обрабатывая различные группы ключей.

  3. Shuffler

    • Шафлер выполняет сортировку и группировку промежуточных данных, созданных мапперами, перед передачей их редьюсерам. Он гарантирует, что все данные с одинаковыми ключами будут обработаны одним редьюсером. После завершения этапа маппинга, промежуточные данные сортируются по ключам и распределяются между редьюсерами.

    • Шафлер также работает параллельно.

  4. Master Node

    • Координирующий узел управляет всей работой системы MapReduce. Он распределяет задачи маппинга и редьюсинга между рабочими узлами, отслеживает их состояние и обрабатывает сбои.

    • Координирующий узел распределяет входные данные между мапперами, собирает промежуточные результаты, передает их шффлеру и распределяет задачи редьюсинга.

    • Этот узел также отвечает за повторное выполнение задач, если какой-либо рабочий узел выходит из строя.

Реализация в коде

Реализуем такой процесс:

  1. Координирующий узел получает запрос на выполнение задачи и разбивает входные данные на фрагменты.

  2. Рабочие узлы маппинга получают эти фрагменты и выполняют операции преобразования, генерируя промежуточные пары ключ-значение.

  3. Шффлер сортирует и группирует эти промежуточные данные, распределяя их между редьюсерами.

  4. Рабочие узлы редьюсинга получают сгруппированные данные и выполняют завершающие операции, генерируя конечные результаты.

  5. Координирующий узел собирает результаты от всех редьюсеров и возвращает их пользователю или сохраняет в базе данных.

Приступим.

Координирующий узел управляет всем процессом, начиная с получения входных данных и их разбиения на фрагменты, и заканчивая сбором конечных результатов от редьюсеров:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"
)

// структура для хранения задачи
type Task struct {
    filename string
}

// Главная функция
func main(г) {
    // файл с данными
    filename := "input.txt"
    // число мапперов и редьюсеров
    numMappers := 3
    numReducers := 2

    // создаем канал для передачи задач мапперам
    mapTasks := make(chan Task, numMappers)
    // создаем канал для передачи промежуточных данных шффлеру
    intermediateData := make(chan map[string]int, numMappers)
    // создаем канал для передачи данных редьюсерам
    reduceTasks := make(chan map[string]int, numReducers)

    var wg sync.WaitGroup

    // запуск мапперов
    for i := 0; i < numMappers; i++ {
        wg.Add(1)
        go mapper(mapTasks, intermediateData, &wg)
    }

    // запуск шафлера
    go shuffler(intermediateData, reduceTasks, numMappers)

    // запуск редьюсеров
    for i := 0; i < numReducers; i++ {
        wg.Add(1)
        go reducer(reduceTasks, &wg)
    }

    // разбиение файла на задачи и отправка мапперам
    file, err := os.Open(filename)
    if err != nil {
        log.Fatalf("Не удалось открыть файл: %s", err)
    }
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        mapTasks <- Task{filename: scanner.Text()}
    }
    close(mapTasks)

    // ожидание завершения всех горутин
    wg.Wait()
    close(intermediateData)
    close(reduceTasks)

    fmt.Println("MapReduce завершен.")
}

Мапперы получают фрагменты входных данных и преобразуют их в промежуточные пары ключ-значение:

// функция маппера
func mapper(tasks <-chan Task, intermediateData chan<- map[string]int, wg *sync.WaitGroup) {
    defer wg.Done()

    for task := range tasks {
        file, err := os.Open(task.filename)
        if err != nil {
            log.Fatalf("Не удалось открыть файл: %s", err)
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        counts := make(map[string]int)
        for scanner.Scan() {
            line := scanner.Text()
            words := strings.Fields(line)
            for _, word := range words {
                counts[word]++
            }
        }
        intermediateData <- counts
    }
}

Шафлер сортирует и группирует промежуточные данные, распределяя их между редьюсерами:

// функция шафлера
func shuffler(intermediateData <-chan map[string]int, reduceTasks chan<- map[string]int, numMappers int) {
    aggregatedData := make(map[string]int)

    for i := 0; i < numMappers; i++ {
        for data := range intermediateData {
            for key, value := range data {
                aggregatedData[key] += value
            }
        }
    }

    reduceTasks <- aggregatedData
}

Редьюсеры получают сгруппированные данные и выполняют завершающие операции, генерируя конечные результаты:

// функция редьюсера
func reducer(reduceTasks <-chan map[string]int, wg *sync.WaitGroup) {
    defer wg.Done()

    for task := range reduceTasks {
        finalCounts := make(map[string]int)
        for key, value := range task {
            finalCounts[key] += value
        }
        // выводим результаты
        for word, count := range finalCounts {
            fmt.Printf("%s: %d\n", word, count)
        }
    }
}

Координирующий узел собирает результаты от всех редьюсеров и возвращает их пользователю или сохраняет в БД:

// главная функция
func main() {
    // пример файла с данными
    filename := "input.txt"
    // число мапперов и редьюсеров
    numMappers := 3
    numReducers := 2

    // создаем канал для передачи задач мапперам
    mapTasks := make(chan Task, numMappers)
    // создаем канал для передачи промежуточных данных шафлеру
    intermediateData := make(chan map[string]int, numMappers)
    // создаем канал для передачи данных редьюсерам
    reduceTasks := make(chan map[string]int, numReducers)

    var wg sync.WaitGroup

    // запуск мапперов
    for i := 0; i < numMappers; i++ {
        wg.Add(1)
        go mapper(mapTasks, intermediateData, &wg)
    }

    // запуск шафлера
    go shuffler(intermediateData, reduceTasks, numMappers)

    // запуск редьюсеров
    for i := 0; i < numReducers; i++ {
        wg.Add(1)
        go reducer(reduceTasks, &wg)
    }

    // разбиение файла на задачи и отправка мапперам
    file, err := os.Open(filename)
    if err != nil {
        log.Fatalf("Не удалось открыть файл: %s", err)
    }
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        mapTasks <- Task{filename: scanner.Text()}
    }
    close(mapTasks)

    // ожидание завершения всех горутин
    wg.Wait()
    close(intermediateData)
    close(reduceTasks)

    fmt.Println("MapReduce завершен.")
}

В каких кейсах MapReduce находит применение

Обработка логов

Обработка логов — это типикал задача для MapReduce, особенно там, где объемы логов могут достигать терабайтов данных ежедневно. Логи могут включать информацию о системных событиях, пользовательских действиях, ошибках и многом другом.

  • Map: На этапе маппинга каждый лог‑файл обрабатывается для извлечения ключевых данных, таких как временные метки, типы событий и идентификаторы пользователей. Каждый маппер генерирует промежуточные пары ключ‑значение, где ключом может быть, например, тип события, а значением — информация об этом событии.

  • Shuffle: На этапе шффлинга данные сортируются и группируются по ключам, что позволяет собрать все события одного типа вместе.

  • Reduce: На этапе редьюсинга агрегируются и анализируются данные. Например, подсчитывается количество каждого типа событий, определяется количество уникальных пользователей и анализируются временные метки для выявления пиков активности.

Анализ текстов

  • Map: Каждый документ разбивается на отдельные слова, которые затем преобразуются в пары ключ‑значение, где ключ — это слово, а значение — единица.

  • Shuffle: Пары ключ‑значение сортируются и группируются по ключам, что позволяет собрать все вхождения каждого слова вместе.

  • Reduce: В редьюсерах подсчитывается количество вхождений каждого слова, что позволяет получить частотный словарь.

Анализ Clickstream

Анализ clickstream данных позволяет понимать поведение пользователей на их веб‑сайтах и мобильных приложениях.

  • Map: Каждый clickstream лог обрабатывается для извлечения данных о действиях пользователя.

  • Shuffle: Данные сортируются и группируются по пользователям или сессиям, что позволяет собрать всю информацию о действиях одного пользователя вместе.

  • Reduce: В редьюсерах анализируются данные о поведении пользователей, что позволяет выявить популярные страницы, типичные пути пользователей и потенциальные узкие места в пользовательском интерфейсе.


MapReduce позволяет решать сложные задачи анализа данных, распределяя нагрузку и тем самым обеспечивая высокую производительность и масштабируемость.

В заключение напомню о ближайших открытых уроках:

  • 18 июля: Дженерики в Go. На вебинаре вы узнаете механизмы обобщенного программирования с использованием дженериков. Мы рассмотрим внутренние механизмы работы дженериков в Go, а также примеры использования. Запись по ссылке

  • 25 июля: Как сделать быстрорастущий сервис с помощью трейсинга? На вебинаре мы наглядно рассмотрим работу сервиса под нагрузкой и найдем запрос с помощью трейсинга. Покажем кейсы, когда уже есть логирование. Запись по ссылке