golang

Как мы пишем ML-приложения с использованием паттерна пайплайнов

  • суббота, 5 апреля 2025 г. в 00:00:12
https://habr.com/ru/companies/tbank/articles/897496/

Привет, Хабр! Я Тимофей Милованов, ведущий Golang-разработчик в команде VoiceKit, где мы занимаемся голосовыми технологиями. Мы разрабатываем сервисы по распознаванию и синтезу голоса, преобразованию одного голоса в другой, а еще голосовой биометрией.

Расскажу о том, почему структура этих сервисов похожа на пайплайн, почему Golang отлично подходит для реализации пайплайнов и как мы написали свою библиотеку для этих пайплайнов.

Жизненный цикл ML-модели

ML невероятно обширный, поэтому нужно четко очертить, о какой его части мы будем говорить. У модели есть жизненный цикл, который упрощенно можно разделить на три этапа: 

  • подготовка данных;

  • тренировка модели;

  • использование модели — продакшен.

Процесс разработки и внедрения ML-модели
Процесс разработки и внедрения ML-модели

Этап обучения — работа ML-инженера. После того как модель обучена, она переходит в ведение backend-разработчика, и для него это набор файлов. Ее можно подгрузить прямо в основной сервис вместе с бизнес-логикой, а можно использовать в отдельном сервисе и ходить, например, по gRPC. Поэтому модель, с точки зрения backend-разработчика, можно считать таким же строительным блоком, как базу данных или очередь. В этой статье мы поговорим именно про этот этап — этап внедрения моделей в backend.

Что понимаем под ML-приложением

Давайте выдумаем какое-то обычное, не ML-приложение, например маркетплейс. В нем будет несколько сервисов: корзина, платежи, избранное. А еще, допустим, мы решили использовать Chat GPT, чтобы он подсказал, что можно купить в подарок другу, и сделали микросервис-прокси или даже подняли собственную небольшую модель.

Упрощенная схема среднестатистического приложения
Упрощенная схема среднестатистического приложения

Несмотря на использование ML-модели, мы будем называть приложение обычным, так как у него привычный нам флоу запроса — сходить в логически слабо связанные сервисы, сгруппировать данные и выдать результат.

Теперь рассмотрим ML-приложение на примере Speech To Text, то есть на примере преобразования голоса в текст.

Цепочка получения текста из аудио
Цепочка получения текста из аудио

На схеме изображено несколько компонент:

  • акустическая модель выдает вероятности произнесенных букв в кусочке аудио;

  • языковая модель собирает эти вероятности в слова и предложения;

  • модель пунктуации расставляет заглавные буквы и знаки препинания.

В приложении флоу запроса — сходить в несколько сервисов (ML-моделей), естественным образом связанных между собой, и выдать конечный результат. Подобный способ решения задачи — не только в Speech To Text. То же самое будет наблюдаться и в генерации аудио из текста, и в генерации изображений. При этом в ходе запроса:

  • данные итеративно преобразуются;

  • вход каждой итерации — большей частью выход предыдущей.

Если подумать, как назвать эти два пункта одним словом, сразу можно понять, что это и есть пайплайн.

Пайплайны

Широкоизвестные пайплайны:

  • Unix pipelines, в которых мы можем передать выход одной команды на вход другой команде и получить что-то более сложное, чем просто маленькие команды.

>t.milovanov@pentagon:~
$ cat election_results.txt | grep "Donald Trump"| wc -l
  • GitLab pipelines, в которых мы можем в одном шаге собрать docker-образ, в другом прогнать unit-тесты и, передавая артефакты из одного шага в другой, решить задачу. Например, вылить новый релиз в production.

Типичный пайплайн в Gitlab
Типичный пайплайн в Gitlab

ML-приложение с использованием паттерна пайплайнов

В Go тоже есть пайплайны. Многие познакомились с этой концепцией в статье 2014 года, которая так и называется — Go Cuncurrency Patterns: Pipeline and cancellation. 

Попробуем реализовать пайплайн распознавания речи с использованием этого архитектурного подхода. Для начала нарисуем наш пайплайн. Пусть это будет какой-то минимум, который состоит из четырех шагов. Упростим пайплайн до одной модели, которая как будто бы сразу умеет преобразовывать аудио в текст.

Игрушечный пайплайн распознавания речи
Игрушечный пайплайн распознавания речи

Уточним, что нам нужен стриминговый сервис по распознаванию. Мы не хотим дожидаться окончания аудио, а будем выдавать распознанный кусочек текста сразу, как только нам пришло аудио.

Распознавание стриминговое!
Распознавание стриминговое!

Воспользуемся gRPC, чтобы предоставить API. Это даст нам два дополнительных шага пайплайна: чтение из запроса и запись ответа. А между ними встроим модель, которая будет переводить голос в текст.

Финальный пайплайн, который мы будем реализовывать.
Финальный пайплайн, который мы будем реализовывать.

Теперь попытаемся написать это на Go и будем опираться на уже упомянутую o Cuncurrency Patterns: Pipeline and cancellation». 

Реализуем первый шаг пайплайна. Во всех шагах мы будем придерживаться следующей структуры: инициализация канала и его немедленный возврат из функции, а логика шага (обработка данных), которая пишет в этот канал, запускается в отдельной горутине.

func readAudio(grpcStream Stream) <-chan []byte {
	out := make(chan []byte)

	go func() {
		defer close(out)
		for {
			req, err := grpcStream.Recv()
			if err != nil {
				handleError(err)
				return
    		}
			out <- req.GetData()
		}
	}()

	return out
}

Второй шаг: эмулируем поход в модель. В данном случае файл модели может быть подгружен в сам сервис либо хоститься в отдельном сервисе (например, с помощью Triton Inference Server) — для структуры кода это неважно. В шаге мы пропустили обработку ошибок для краткости кода.

func sendToModel(in <-chan []byte) chan []string{
	out := make(chan []string)

	go func() {
		defer close(out)
		for audioBytes := range in {
			resp := modelClient.Infer(audioBytes)
			out <- resp
		}
	}()

	return out
}

Третий шаг будет еще проще по структуре. Так как это финальный шаг, отправлять данные по пайплайну дальше не нужно и мы только читаем данные, которые пришли нам от модели, и пишем их в gRPC stream. Обработка ошибок тоже опущена для краткости и наглядности.

func sendResponse(grpcStream Stream, in <-chan string) {
	for textFromAudio := range in {
		grpcStream.Send(&Response{
			ResultText: textFromAudio
		})
	}
}

Теперь объединим шаги в цельный пайплайн, завернув шаги один в другой. Вызов пайплайна в коде:

func SpeechToTextRequest(stream Stream) {
	sendResponse(
		stream,
		sendToModel(
			readAudio(stream),
		),
	)
}

Добавим в пайплайн немного сложности. Допустим, аудио нужно залогировать, чтобы доучить модель в будущем или воспроизвести запрос в случае бага. Но хорошо бы, чтобы этот шаг был опциональным, потому что мы не хотим логировать данные, если в аудио окажется персональная информация.

Усложненный пайплайн с логированием
Усложненный пайплайн с логированием

Напишем код операции для логирования аудио. Будем накапливать аудио из канала и в конце, когда аудио закончилось, залогируем.

func logAudio(in <-chan []byte) {
	var buffer []byte
	for chunk := range in {
		buffer = append(buffer, chunk)
	}

	audioLogger.Log(buffer)
}

Заметим, что на этом шаге мы не можем использовать тот же канал, что передаем в шаг STT Model. Мы знаем, что в Go нельзя прочитать значение из канала дважды. 

Мы должны задублировать данные из шага чтения аудио. Для этого существует паттерн, который называется Fan Out, и он также упоминается в статье, в параграфе Fan-out, fan-in. Но в нашем случае есть отличие. 

В оригинале под этой идеей описана передача одного и того же канала в несколько разных воркеров для распараллеливания одинаковой работы и лучшей утилизации CPU. 

В нашем случае нужен копирующий fan out, который будет копировать данные из одного канала в два новых, чтобы два разных шага совершили разную работу. После написания кода четвертого шага и функции fan out можем полностью собрать пайплайн:

func SpeechToTextRequest(stream Stream) error {
	audioCopies := fanOut(readAudio(stream), 2)
	audio, audioCopy := audioCopies[0], audioCopies[1]

	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		sendResponse(stream, sendToModel(audio))
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		audioLogger(audioCopy)
	}()

	wg.Wait()

	return nil
}

С появлением одного шага код усложнился: добавились примитивы синхронизации, из-за этого код стало сложнее читать. Мы получили (субъективно) сложный код на пайплайне из 4 шагов. В реальном пайплайне распознавания речи 30+ шагов. В связи с такой высокой сложностью сборка пайплайна «в лоб» представляется очень запутанной и сложной. Поэтому напрашивается идея вынести инфраструктурный код и конструирование пайплайна в небольшую библиотеку.

Библиотека пайплайнов

Для начала помечтаем, чего мы хотим. У нас в коде, где мы собираем пайплайн, появились примитивы синхронизации. Нам было бы удобнее видеть и править только бизнесовый код: какой шаг за каким идет и что принимает. Примерно так:

func SpeechToTextRequest(stream Stream) error {
	addStage(audioReader)
	addStage(speechToTextModel)
	if EnableAudioLogging {
		addStage(audioLogger)
	}
	addStage(responseWriter)
}

Это хороший код, потому что он плоский и в нем нет асинхронного кода. Глядя на такой код, проще собрать в голове всю схему пайплайна. И если разработчику нужно будет добавить новый шаг, он быстро поймет, в каком месте это нужно сделать, и это вряд ли приведет к багам из-за того, что асинхронной сущности пайплайна теперь не видно.

Cборка пайплайна с использованием нашей библиотеки. Начнем не с внутренностей библиотеки, а с того, как она используется. Продемонстрируем сборку пайплайна из четырех шагов (вместе с логированием аудио) с использованием библиотеки.

func SpeechToTextRequest(stream Stream) {
	pipeline := pipelines.New()

	// Создали все необходимые операции
	audioReaderStage := NewAudioReader(stream)
	audioFanOut := pipelines.NewCopyFanOut()
	modelStage := NewSpeechToTextModel()
	audioLoggerStage := NewAudioLogger()
	responseWriterStage := NewResponseWriter(stream)

	// Добавили операцию reader
	readerOperationID := pipeline.AddStage(ctx,
		audioReaderStage, []pipelines.InputSpec{},
	)

	// Добавили операцию fan out
	audioFanOutID := pipeline.AddStage(ctx,
		audioFanOut, []pipelines.InputSpec{{
			OperationID: readerOperationID,
			OutputIndex: 0,
		}},
	)

	// Добавили операцию обращения к модели
	modelOperationID := pipeline.AddStage(ctx,
		modelStage, []pipelines.InputSpec{{
			OperationID: readerOperationID,
			OutputIndex: 0,
		}},
	)

	// Добавили операцию writer
	_ = pipeline.AddStage(ctx,
		responseWriterStage, []pipelines.InputSpec{{
			OperationID: modelOperationID,
			OutputIndex: 0,
		}},
	)

	// Добавили операцию логирования
	if EnableAudioLogging {
		_ = pipeline.AddStage(ctx,
			audioLoggerStage, []pipelines.InputSpec{{
				OperationID: audioFanOutID,
				OutputIndex: 1,
			}},
		)
	}

	ctx := stream.Context()
	pipeline.Run(ctx)
}

Код получился несколько длиннее, чем мы хотели, но мы смогли оставить его структурно аналогичным тому, что мы представляли, —плоским и без использования примитивов синхронизации. 

Разберемся с некоторыми непонятными моментами, которых не было в нашем идеальном коде. Например, с выделенным кусочком кода при добавлении операции в пайплайн.

modelOperationID := pipeline.AddStage(ctx,
	modelStage, []pipelines.InputSpec{{
		OperationID: audioFanOutID,
		OutputIndex: 0,
	}},
)

Для этого сначала посмотрим на схему пайплайна и добавим в нее некоторые дополнительные данные, а именно: пронумеруем входы и выходы всех шагов, начиная с нуля:

Если перенести это в код, получится:

А если убрать отдельную переменную sttModelInputSpec, получим ту «магию», с которой начинали:

Дальше разберемся со второй непонятной частью — с тем, как устроены шаги пайплайна. Раньше это были отдельные функции, а теперь мы собираем какие-то объекты:

Шаг пайплайна — это структура с главной функцией Run. О ней нужно знать три вещи: принимает на вход контекст запроса, возвращает ошибку, принимает и передает данные через специальные методы контекста Send и Recv:

func (r *STTModelStage) Run(ctx pipelines.Context) error {
	for {
		item, done := ctx.Recv(0)
		if done {
			return nil
		}

		// Отлично, работаем дальше...
	}
}

Важный момент — как и когда пайплайн должен завершаться. Когда мы составляли пайплайн на основе библиотеки, этим вопросом не задавались. Пайплайн завершался, только когда самый первый шаг закрывал канал, то есть когда кончались данные. На самом деле мы должны остановиться как минимум в следующие моменты:

  • запрос закончился, данных больше нет;

  • пользователь отменил запрос;

  • сервис необходимо перезагрузить (например, мы релизим новую версию);

  • произошла ошибка в критичном участке пайплайна (например, проблемы на зависимом сервисе, недоступна модель).

С точки зрения шага пайплайна эта логика находится в очерченном красным квадратом участке. Если контекст сигнализировал об окончании работы, шаг делает всю необходимую очистку (например, закрывает соединения к нижележащим сервисам) и выходит:

Но контекст, используемый в пайплайне, — не привычный Context из стандартного пакета Context, а кастомный. На самом деле это тонкая обертка над ним. В Inputs приходит то, что шаг обрабатывает, в Outputs уходят данные после обработки:

type stageContext struct {
	inputs           []chan interface{}
	outputs          []chan interface{}
	pipelinesContext context.Context
}

Таблица для значения Done, которое вызывающий код получает на ctx.Recv:

// <Is input channel closed?>, <is pipeline context done?> -> should caller finish its job
// false, false -> false
// false, true  -> true
// true, false  -> true
// true, true   -> true
func (c *nodeContext) Recv(inputIndex int) (value interface{}, returnFromCaller bool) {

С учетом этого знания финальный код для функции Recv:

func (c *nodeContext) Recv(inputIndex int) (value interface{}, returnFromCaller bool) {
	if inputIndex >= len(c.inputs) {
		return nil, true
	}

	value, ok := <-c.inputs[inputIndex]
	if !ok {
		return value, true
	}

	return value, internal.IsCtxDone(c.pipelineContext)
}

Нам осталось понять, как происходит запуск пайплайна, то есть что находится внутри функции Run. Именно туда ушла вся асинхронность. Внутри простая error-группа, внутри которой мы запускаем каждый шаг (ноду пайплайна):

func (p *Pipeline) run(ctx context.Context) error {
	p.logStart(ctx)
	observeStartMetrics(p.pipelineType)
	result := p.run(ctx)
	observeFinishMetrics(p.pipelineType, result)
	p.logEnd(ctx, result)
	return result
}

Нода — это небольшая обертка над операцией. Ее функция Run отвечает за закрытие выходов этой операции и обработку паники:

func (pn *pipelineNode) Run(ctx context.Context) (err /*we need the naming return because of defer recovery*/ error) {
	defer func() {
		for _, ch := range pn.Outputs {
			close(ch)
		}
	}()

	defer func() {
		if r := recover(); r != nil {
			err = &PanicError{PanicValue: r, Stack: debug.Stack()}
		}
	}()

	nodeContext := newContext(ctx, pn.Inputs, pn.Outputs, nodeStat)

	if e := pn.Operation.Run(nodeContext); e != nil {
		err = &ErrorInfo{error: e, operationType: reflect.TypeOf(pn.Operation)}
	}

	return err
}

Осталось добавить немного полезностей, например метрики и логи:

Итоги внедрения библиотеки:

  • Получили плоский код создания сложных пайплайнов.

  • Отделили инфраструктурный код от бизнесового.

  • Стандартизировали API для большой команды.

Мы не публикуем в open source библиотеку, про которую рассказываем в статье. Для этого есть несколько причин: мы не писали документацию, достаточно качественную для использования широким пользователем, и не готовы ревьюить Merge Request в GitHub или отвечать на Issues.

К тому же спустя некоторое время после того, как мы написали свою библиотеку, в open source вышли похожие библиотеки, например google/go-pipeline или deliveryhero/pipeline. 

Глобальные выводы

ML-приложение — это не всегда Python. Если вы обучали на Python-фреймворке, это не значит, что нужно обязательно использовать его и в Production.

ML-приложение — это часто пайплайн. А Go хорош в пайплайнах. Если допилить напильником классические подходы, может стать вообще хорошо. Если остались вопросы или хотите поделиться своим опытом и идеями — добро пожаловать в комментарии! 

В этой статье я старался показать подходы к использованию пайплайнов для организации кода, подсвечивание сложностей, с которыми может столкнуться разработчик при их использовании, и пути решения.