Как создать YouTube GIF Maker с использованием Next.js, Node и RabbitMQ
- четверг, 15 февраля 2024 г. в 00:00:16
Полный код проекта можно найти на github, а по этой ссылке посмотреть демо-версию приложения.
Идея приложения — дать пользователям возможность создавать GIF-файлы из видеороликов с YouTube. Для этого нужно просто указать ссылку на нужное видео, а также задать время начала и окончания GIF.
Создание GIF из видео на YouTube с определённым временным диапазоном.
Предварительный просмотр результата перед выполнением фактической конвертации.
Каждый компонент системы отвечает за работу в отдельности и взаимодействие с другими компонентами.
Клиентская часть на React (Next.js). Общается с бэкенд-сервером для создания новых запросов на конвертацию GIF, а также для получения информации об уже созданных конвертациях.
Бэкенд-сервер на Node. Обрабатывает запросы от клиента, а также отправляет новые задачи на конвертацию в брокер сообщений.
Node как Service Worker. Будет отвечать за выполнение / обработку задач по конвертации.
В качестве брокера сообщений — RabbitMQ. Будет выступать в качестве очереди задач (Task Queue), в которую бэкенд-сервер будет отправлять задачи, а Service Worker — потреблять задачи из неё.
Для хранения данных — MongoDB. Будет хранить информацию о задачах по конвертации GIF.
Для хранения медиафайлов — Google Cloud Storage. Будет использоваться для хранения конвертированных GIF-файлов.
Это диаграмма показывает поток данных, включающий все компоненты, упомянутые выше.
Давайте подробнее рассмотрим бэкенд-сервер.
Как видно из приведённой выше диаграммы, бэкенд-сервер выполняет три основные функции:
Обработка запросов на конвертацию GIF путём создания новой записи задачи в базе данных.
Отправка событий в RabbitMQ, указывающих, что была создана новая задача на конвертацию (постановка задач в очередь).
Обработка запросов на получение задач путём запроса задачи по её ID из базы данных и возврата соответствующего ответа.
Архитектура приложения состоит из трёх основных компонентов:
Обработчик маршрутов
Контроллер
Сервис
Каждый из них обладает определённым функционалом, который мы сейчас рассмотрим подробнее; а также разберёмся, почему архитектура построена именно таким образом.
Отвечает за распределение запросов по обработчикам различных маршрутов. Обычно эти обработчики маршрутов состоят из массива обработчиков, которые мы называем "Middleware Chain", а конечным обработчиком в этой цепочке является контроллер маршрута (Route Controller).
Middleware Chain обычно отвечает за выполнение «проверок» входящего запроса, а также в некоторых случаях за модификацию объекта запроса. В нашем случае мы будем выполнять проверку с помощью своего обработчика-валидатора.
Извлекает данные из запроса, а также очищает при необходимости.
Делегирует управление соответствующему сервису.
Обрабатывает ответы.
Перенаправляет ошибки промежуточному обработчику ошибок.
Содержит всю бизнес-логику.
Имеет доступ к данным с помощью уровня доступа к данным (ORM/ODM).
Контроллеры должны быть глупыми, то есть не иметь никаких подробностей о бизнес-логике. Всё, что они знают, это «какой сервис может обработать этот запрос», «какие данные нужны этому сервису», «как должен выглядеть ответ». Это позволяет избежать наличия толстых контроллеров (Fat Controllers).
В этом проекте мы используем TypeORM, который представляет собой ORM с поддержкой TypeScript и поддерживает множество баз данных (мы будем использовать MongoDB).
Мы представим каждую конвертацию GIF в виде задачи (Job), которая будет нашей единственной коллекцией (Collection).
Job Collection в TypeORM выглядит так:
import { BaseEntity, Entity, ObjectID, Column, CreateDateColumn, UpdateDateColumn, ObjectIdColumn } from 'typeorm';
@Entity('jobs')
export class Job extends BaseEntity {
@ObjectIdColumn()
id: ObjectID;
@Column({
nullable: false,
})
youtubeUrl: string;
@Column({
nullable: false,
})
youtubeId: string;
@Column({
nullable: true,
})
gifUrl: string;
@Column({
nullable: false,
})
startTime: number;
@Column({
nullable: false,
})
endTime: number;
@Column({
type: 'enum',
enum: ['pending', 'processing', 'done', 'error'],
})
status: 'pending' | 'processing' | 'done' | 'error';
@Column()
@CreateDateColumn()
createdAt: Date;
@Column()
@UpdateDateColumn()
updatedAt: Date;
}
Здесь важно обратить внимание на поле status
— по сути, оно является перечислением (enum), указывающим на текущий статус конвертации GIF. Все остальные поля представляют собой стандартные данные, необходимые для выполнения конвертации.
У нас будет только два маршрута.
Маршрут для создания новой задачи по конвертации GIF.
Маршрут для получения данных о задачи преобразования из его идентификатора, который будет использоваться для опроса в дальнейшем на стороне клиента.
Вот как выглядит наш обработчик маршрутов:
//routes.interface
import { Router } from 'express';
interface Route {
path?: string;
router: Router;
}
export default Route;
//jobs.route.ts
import { Router } from 'express';
import { CreateJobDto } from '../../common/dtos/createJob.dto';
import Route from '../../common/interfaces/routes.interface';
import JobsController from '../../controllers/jobs.controller';
import validationMiddleware from '../middlewares/validation.middleware';
class JobsRoute implements Route {
public path = '/jobs';
public router = Router();
constructor(private jobsController = new JobsController()) {
this.initializeRoutes();
}
private initializeRoutes() {
this.router.get(`${this.path}/:id`, this.jobsController.getJobById);
this.router.post(`${this.path}`, validationMiddleware(CreateJobDto, 'body'), this.jobsController.createJob);
}
}
export default JobsRoute;
Для проверки мы используем свой обработчик-валидатор, который проверяет DTO с помощью class-validator и class-transformer
//createJob.dto
import { Expose } from 'class-transformer';
import { IsNotEmpty, IsNumber, IsString, Matches } from 'class-validator';
import { IsGreaterThan } from './validators/isGreaterThan';
import { MaximumDifference } from './validators/maximumDifference';
export class CreateJobDto {
@IsNotEmpty()
@IsString()
@Matches(/^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/, {
message: 'Invalid youtube url',
})
@Expose()
public youtubeUrl: string;
@IsNotEmpty()
@IsNumber()
@Expose()
public startTime: number;
@IsNotEmpty()
@IsNumber()
@IsGreaterThan('startTime', {
message: 'end time must be greater than start time',
})
@MaximumDifference('startTime', {
message: 'maximum gif duration is 30 seconds',
})
@Expose()
public endTime: number;
}
Обратите внимание, что IsGreaterThan
и MaximumDifference
— это пользовательские декораторы валидации для class-validator
. По сути, они выглядят так (подробнее читайте в документации):
//isGreaterThan.ts
import { registerDecorator, ValidationOptions, ValidationArguments } from 'class-validator';
export function IsGreaterThan(property: string, validationOptions?: ValidationOptions) {
return function (object: Object, propertyName: string) {
registerDecorator({
name: 'isGreaterThan',
target: object.constructor,
propertyName: propertyName,
constraints: [property],
options: validationOptions,
validator: {
validate(value: any, args: ValidationArguments) {
const [relatedPropertyName] = args.constraints;
const relatedValue = (args.object as any)[relatedPropertyName];
return typeof value === 'number' && typeof relatedValue === 'number' && value > relatedValue;
},
},
});
};
}
MaximumDifference
выглядит аналогично, но его возврат выглядит следующим образом:
return typeof value === 'number' && typeof relatedValue === 'number' && value - relatedValue <= difference;
И теперь наш обработчик (validation middleware) выглядит следующим образом:
validation.middleware.ts
import { plainToClass } from 'class-transformer';
import { validate, ValidationError } from 'class-validator';
import { RequestHandler } from 'express';
const validationMiddleware = (type: any, value: string | 'body' | 'query' | 'params' = 'body', skipMissingProperties = false): RequestHandler => {
return (req, res, next) => {
validate(plainToClass(type, req[value]), { skipMissingProperties }).then((errors: ValidationError[]) => {
if (errors.length > 0) {
const message = errors.map((error: ValidationError) => Object.values(error.constraints)).join(', ');
res.status(400).send(message);
} else {
next();
}
});
};
};
export default validationMiddleware;
Наш контроллер выглядит довольно стандартно. Единственное, что можно отметить — это извлечение объекта CreateJobDto
из тела с помощью plainToClass
из class-transformer
с excludeExtraneousValues: true
, который уничтожает только помеченные поля (с декоратором @Expose()
в классе CreateJobDto
) Подробнее об этом в документации по class-transformer.
//jobs.controllers.ts
import { plainToClass } from 'class-transformer';
import { NextFunction, Request, Response } from 'express';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import { Job } from '../entities/jobs.entity';
import JobsService from '../services/jobs.service';
class JobsController {
constructor(private jobService = new JobsService()) {}
public createJob = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobDto: CreateJobDto = plainToClass(CreateJobDto, req.body, { excludeExtraneousValues: true });
const createdJob: Job = await this.jobService.createJob(jobDto);
res.status(201).json(createdJob);
} catch (error) {
next(error);
}
};
public getJobById = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobId = req.params.id;
const job: Job = await this.jobService.findJobById(jobId);
const responseStatus = job.status === 'done' ? 200 : 202;
res.status(responseStatus).json(job);
} catch (error) {
next(error);
}
};
}
export default JobsController;
Также стоит отметить, что код состояния ответа [GET] /job/{id} равен 202, если задача на преобразование всё ещё находится в обработке. Подробнее об этом читайте в посте «Асинхронный шаблон запроса-ответа».
В случае возникновения ошибки она передаётся в обработчик ошибок (error middleware), который является последним посредником в нашей цепочке и выглядит следующим образом:
//error.middleware.ts
import { NextFunction, Request, Response } from 'express';
import { isBoom, Boom } from '@hapi/boom';
import { logger } from '../../common/utils/logger';
function errorMiddleware(error: Boom | Error, req: Request, res: Response, next: NextFunction) {
const statusCode: number = isBoom(error) ? error.output.statusCode : 500;
const errorMessage: string = isBoom(error) ? error.message : 'Something went wrong';
logger.error(`StatusCode : ${statusCode}, Message : ${error}`);
return res.status(statusCode).send(errorMessage);
}
export default errorMiddleware;
Вы можете заметить, что мы импортировали пакет под названием Boom — о нём мы поговорим позже в разделе «Сервисы».
JobService
имеет всю бизнес-логику и доступ к слою Data Access, а также взаимодействует с RabbitMQ Service для отправки событий в очередь.
//jobs.service.ts
import * as Boom from '@hapi/boom';
import Container from 'typedi';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import EventEmitter from '../common/utils/eventEmitter';
import { Job } from '../entities/jobs.entity';
import RabbitMQService from './rabbitmq.service';
class JobsService {
private events = {
JobCreated: 'JobCreated',
};
constructor() {
this.intiializeEvents();
}
private intiializeEvents() {
EventEmitter.on(this.events.JobCreated, (job: Job) => {
const rabbitMQInstance = Container.get(RabbitMQService);
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
public async findJobById(jobId: string): Promise<Job> {
const job: Job = await Job.findOne(jobId);
if (!job) throw Boom.notFound();
return job;
}
public async createJob(jobDto: CreateJobDto): Promise<Job> {
const createdJob: Job = await Job.save({ ...jobDto, youtubeId: jobDto.youtubeUrl.split('v=')[1]?.slice(0, 11), status: 'pending' } as Job);
EventEmitter.emit(this.events.JobCreated, createdJob);
return createdJob;
}
}
export default JobsService;
Сразу же вы увидите два импорта, с которыми вы можете быть незнакомы — мы быстро пройдёмся по ним, а затем подробно объясним каждую функцию в этом классе.
Boom
Используется для создания http-объектов с мощным, простым и дружелюбным интерфейсом.
TypeDI
TypeDI — это мощный пакет для инъекции зависимостей, который обладает множеством фичей. Одной из таких фич является наличие Singleton Services, что мы и используем в нашем случае.
Теперь давайте подробнее рассмотрим некоторые функции в классе.
Эта функция использует глобальный EventEmitter
, который мы используем во всём проекте, чтобы добавить слой pub/sub.
это так же просто, как
//eventEmitter.ts
import { EventEmitter } from 'events';
export default new EventEmitter();
и теперь мы можем начать прослушивать события. В частности, событие, которое мы создадим позже при создании новой задачи под названием 'JobCreated'
// Defines all the events in our service
private events = {
JobCreated: 'JobCreated',
};
private intiializeEvents() {
// Start listening for the event 'JobCreated'
EventEmitter.on(this.events.JobCreated, (job: Job) => {
// Get a singleton instance of our RabbitMQService
const rabbitMQInstance = Container.get(RabbitMQService);
// Dispatch an event containing the data of the created job
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
Подробнее читайте в посте «Добавление слоя Pub/Sub в Express бэкенд».
Эта функция:
Создаёт новый документ задачи в базе данных.
Отправляет событие 'JobCreated' о том, что была создана новая задача. Таким образом, слушатель события (event listener) будет обрабатывать логику отправки этого события в службу RabbitMQ.
Этот сервис отвечает за подключение к серверу RabbitMQ, создание канала и инициализацию очереди, которая будет использоваться для создания задач (и которые будут потребляться нашим Service Worker-ом).
В качестве клиента для RabbitMQ Server используется amqplib.
//rabbitmq.service.ts
import { Service } from 'typedi';
import amqp, { Channel, Connection } from 'amqplib';
import { logger } from '../common/utils/logger';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
} catch (err) {
logger.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
logger.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
logger.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
logger.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async sendToQueue(message: string) {
this.channel.sendToQueue(this.queueName, Buffer.from(message), {
persistent: true,
});
logger.info(`sent: ${message} to queue ${this.queueName}`);
}
}
Код для настройки соединения / каналов / очередей довольно стандартен. Подробнее ознакомиться с этими функциями можно в документации RabbitMQ или anqplib. Единственная функция, которую нам нужно будет использовать вне этого класса — это sendToQueue()
. Она используется для отправки сообщения в нашу очередь задач, как показано в JobService, путём отправки строкового объекта Job
.
rabbitMQInstance.sendToQueue(JSON.stringify(job));
Теперь нам нужно только инициализировать RabbitMQ Service в начале работы приложения следующим образом:
import Container from 'typedi';
// Call initializeRabbitMQ() somewhere when starting the app
private initializeRabbitMQ() {
Container.get(RabbitMqService);
}
Работа нашего бэкенд-сервиса завершена. Осталость только, чтобы Node Service Worker использовал очередь задач и выполнил фактическое преобразование GIF. Давайте рассмотрим, как это сделать.
Как видите, Service Worker отвечает за:
Потребление задач из очереди задач;
Преобразование части видео на YouTube в GIF;
Загрузку GIF в облачное хранилище;
Обновление gifUrl и статуса задачи в базе данных.
Эта блок-схема наглядно показывает работу Service Worker-а.
RabbitMQ Service в Service Worker-е похож на RabbitMQ Service на бэкенд-сервере, за исключением одной единственной функции — startConsuming()
//rabbitmq.service.ts
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib';
import Container, { Service } from 'typedi';
import { Job } from '../entities/jobs.entity';
import ConversionService from './conversion.service';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
await this.startConsuming();
} catch (err) {
console.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
console.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
console.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
console.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async startConsuming() {
const conversionService = Container.get(ConversionService);
this.channel.prefetch(1);
console.info(' 🚀 Waiting for messages in %s. To exit press CTRL+C', this.queueName);
this.channel.consume(
this.queueName,
async (msg: ConsumeMessage | null) => {
if (msg) {
const job: Job = JSON.parse(msg.content.toString());
console.info(`Received new job 📩 `, job.id);
try {
await conversionService.beginConversion(
job,
() => {
this.channel.ack(msg);
},
() => {
this.channel.reject(msg, false);
},
);
} catch (err) {
console.error('Failed to process job', job.id, err);
}
}
},
{
noAck: false,
},
);
}
}
startConsuming()
потребляет сообщение из очереди, разбирает его JSON-объект и делегирует процесс конвертации в ConversionService
.
Всё, что нужно ConversionService
для выполнения преобразования, — это объект Job
, а также два коллбэка, используемых для подтверждения или отклонения сообщения из очереди (рассмотрим ниже).
Также обратите внимание, что в этом примере мы используем
this.channel.prefetch(1);
Об этом мы также поговорим немного позже.
Чтобы удалить задачу из очереди (что свидетельствует о том, что сервис успешно обработал задачу либо отрицательно, либо положительно), нам нужно сделать ручное подтверждение.
Это можно сделать в amqplib, используя либо
channel.ack(msg);
Чтобы указать на положительное подтверждение сообщения.
Либо
// Второй параметр указывает, следует ли повторно поставить сообщение в очередь или нет
channel.reject(msg, false);
чтобы указать на отрицательное подтверждение сообщения.
Обратите внимание, что в случае ошибки мы не возвращаем сообщение в очередь, а считаем его «неудачным преобразованием». Но это можно оставить на усмотрение программиста.
Подробнее о подтверждении сообщений в RabbitMQ можно прочитать здесь.
Этот сервис содержит основную логику нашего Service Worker-а.
Он раскрывает функцию beginConversion()
, которая вызывается из службы RabbitMQ при получении сообщения
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
...
}
Эта функция выполнит все шаги, необходимые для преобразования, а затем вызовет либо onSuccess()
, либо onError()
в зависимости от успеха или неудачи.
Шаги, необходимые для преобразования видео с YouTube в GIF:
Загрузка видео с YouTube.
Видео с YouTube загружается локально.
Преобразование загруженного видео в GIF.
Видео конвертируется в GIF (конвертируется только выбранный диапазон по времени начала/окончания).
Загрузка GIF в облачное хранилище Google.
Обновление базы данных.
Вызывается onSuccess() или onError() соответственно.
Начнём с локальной загрузки видео с YouTube.
Чтобы загрузить видео с YouTube локально, мы используем пакет ytdl-core, предназначенный для этой задачи.
За это отвечает функция downloadVideo()
, которая принимает URL/ID видео и возвращает ReadableStream, который можно использовать для сохранения видеофайла локально, а также его расширения, например: mp4, avi... и так далее.
//conversion.service.ts
import { Readable } from 'stream';
import ytdl from 'ytdl-core';
import YoutubeDownload from '../common/interfaces/YoutubeDownload';
private async downloadVideo({ youtubeId, youtubeUrl }: YoutubeDownload): Promise<{ video: Readable ; formatExtension: string }> {
const info = await ytdl.getInfo(youtubeId);
const format: ytdl.videoFormat = info.formats[0];
if (!format) throw new Error('No matching format found');
const video = ytdl(youtubeUrl, {
format,
});
return { video, formatExtension: format.container };
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
console.info('Started Processing Job :', job.id);
const { video, formatExtension } = await this.downloadVideo({
youtubeId: job.youtubeId,
youtubeUrl: job.youtubeUrl,
});
const srcFileName = `./src/media/temp.${formatExtension}`;
video.on('progress', (chunkLength, downloaded, total) => {
//... Logic for showing progress to the user..i.e progress bar
});
video.pipe(
fs
.createWriteStream(srcFileName)
.on('open', () => {
//Video download started
console.log('Downloading Video');
})
.on('finish', async () => {
//Video finished downloading locally in srcFileName
console.info('Downloaded video for job ', job.id);
//...Logic for converting the locally downloaded video to GIF
})
.on('error', async () => {
//...handle failure logic
}),
);
} catch (err) {
//...handle failure logic
}
}
Для преобразования локального видео в GIF мы будем использовать ffmpeg.wasm, который по сути является Webassembly-портом FFmpeg. Этот процесс можно представить себе как асинхронное использование FFmpeg внутри ноды для выполнения преобразования. Никаких порождений внешних процессов, никаких зависимых инструментов и так далее — что одновременно очень мощно и просто.
//conversion.service.ts
import { createFFmpeg, fetchFile, FFmpeg } from '@ffmpeg/ffmpeg';
import GifConversion from '../common/interfaces/GifConversion';
//...somewhere in our code
const ffmpeg = createFFmpeg({
log: false,
progress: p => {
progressBar.update(Math.floor(p.ratio * 100));
},
});
await ffmpeg.load();
//Converts a video range to GIF from srcFileName to destFileName
private async convertToGIF({ startTime, endTime, srcFileName, destFileName, formatExtension }: GifConversion) {
try {
console.info('Converting Video to GIF');
this.ffmpeg.FS('writeFile', `temp.${formatExtension}`, await fetchFile(srcFileName));
await this.ffmpeg.run(
'-i',
`temp.${formatExtension}`,
'-vcodec',
'gif',
'-ss',
`${startTime}`,
'-t',
`${endTime - startTime}`,
'-vf',
'fps=10',
`temp.gif`,
);
await fs.promises.writeFile(destFileName, this.ffmpeg.FS('readFile', 'temp.gif'));
console.info('Converted video to gif');
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
console.info('Started Processing Job :', job.id);
const srcFileName = `./src/media/temp.${formatExtension}`;
const destFileName = `./src/media/temp.gif`;
//... Video download logic
// GIF Conversion
await this.convertToGIF({
startTime: job.startTime,
endTime: job.endTime,
srcFileName,
destFileName,
formatExtension,
});
} catch (err) {
//...handle failure logic
}
}
После того как локальный видеофайл был сконвертирован в GIF, можно загрузить его в облачное хранилище Google.
Сначала создадим службу CloudStorageService
, которая будет отвечать именно за это.
В нашем случае используем Google Cloud Storage.
import { Storage } from '@google-cloud/storage';
import * as _ from 'lodash';
import { Service } from 'typedi';
@Service()
class CloudStorageService {
private storage;
private BUCKET_NAME;
constructor() {
const privateKey = _.replace(process.env.GCS_PRIVATE_KEY, new RegExp('\\\\n', 'g'), '\n');
this.BUCKET_NAME = 'yourbucketname';
this.storage = new Storage({
projectId: process.env.GCS_PROJECT_ID,
credentials: {
private_key: privateKey,
client_email: process.env.GCS_CLIENT_EMAIL,
},
});
}
async uploadGif(gifImage: Buffer, uploadName: string) {
try {
const bucket = await this.storage.bucket(this.BUCKET_NAME);
uploadName = `ytgif/${uploadName}`;
const file = bucket.file(uploadName);
await file.save(gifImage, {
metadata: { contentType: 'image/gif' },
public: true,
validation: 'md5',
});
return `https://storage.googleapis.com/${this.BUCKET_NAME}/${uploadName}`;
} catch (err) {
throw new Error('Something went wrong while uploading image');
}
}
}
export default CloudStorageService;
Теперь можно использовать его для загрузки сгенерированного GIF.
//conversion.service.ts
import Container from 'typedi';
import CloudStorageService from './cloudStorage.service';
private async uploadGifToCloudStorage(destFileName, uploadName): Promise<string> {
try {
console.info('Uploading gif to cloud storage');
const gifImage = await fs.promises.readFile(destFileName);
const cloudStorageInstance = Container.get(CloudStorageService);
const gifUrl = await cloudStorageInstance.uploadGif(gifImage, `gifs/${uploadName}`);
return gifUrl;
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
const destFileName = `./src/media/temp.gif`;
//... Video download logic
//... Video conversion logic
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
} catch (err) {
//...handle failure logic
}
}
Она довольно проста. Сначала мы должны обновить задачу в базе данных.
В случае успеха:
Установите статус задачи на 'done' и обновите gifUrl на загруженный GIF в Google Cloud Storage.
В случае неудачи:
Установите статус задачи на 'error'.
После этого мы вызовем функцию onSuccess()
или onError()
, которая, по сути, будет обрабатывать положительное / отрицательное подтверждение сообщения RabbitMQ.
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
const destFileName = `./src/media/temp.gif`;
//... Video download logic
//... Video conversion logic
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
//Success scenario
await this.jobService.updateJobById(job.id as any, { status: 'done', gifUrl });
console.info(`Finished job ${job.id}, gif at ${gifUrl}`);
onSuccess();
} catch (err) {
//Failure scenario
console.error('Failed to process job', job.id);
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
}
}
Если собрать всё вместе, а также добавить шкалу прогресса выполнения с помощью cli-progress, то ConversionService
будет выглядеть следующим образом:
import Container, { Service } from 'typedi';
import JobsService from './jobs.service';
import ytdl from 'ytdl-core';
import { Readable } from 'stream';
import { Job } from '../entities/jobs.entity';
import { createFFmpeg, fetchFile, FFmpeg } from '@ffmpeg/ffmpeg';
import fs from 'fs';
import cliProgress from 'cli-progress';
import CloudStorageService from './cloudStorage.service';
import GifConversion from '../common/interfaces/GifConversion';
import YoutubeDownload from '../common/interfaces/YoutubeDownload';
const progressBar = new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic);
@Service()
export default class ConversionService {
private ffmpeg: FFmpeg = null;
constructor(private jobService = new JobsService()) {}
public async initializeService() {
try {
this.ffmpeg = createFFmpeg({
log: false,
progress: p => {
progressBar.update(Math.floor(p.ratio * 100));
},
});
await this.ffmpeg.load();
} catch (err) {
console.error(err);
}
}
private async downloadVideo({ youtubeId, youtubeUrl }: YoutubeDownload): Promise<{ video: Readable; formatExtension: string }> {
const info = await ytdl.getInfo(youtubeId);
const format: ytdl.videoFormat = info.formats[0];
if (!format) throw new Error('No matching format found');
const video = ytdl(youtubeUrl, {
format,
});
return { video, formatExtension: format.container };
}
private async convertToGIF({ startTime, endTime, srcFileName, destFileName, formatExtension }: GifConversion) {
try {
console.info('Converting Video to GIF');
this.ffmpeg.FS('writeFile', `temp.${formatExtension}`, await fetchFile(srcFileName));
progressBar.start(100, 0);
await this.ffmpeg.run(
'-i',
`temp.${formatExtension}`,
'-vcodec',
'gif',
'-ss',
`${startTime}`,
'-t',
`${endTime - startTime}`,
'-vf',
'fps=10',
`temp.gif`,
);
progressBar.stop();
await fs.promises.writeFile(destFileName, this.ffmpeg.FS('readFile', 'temp.gif'));
console.info('Converted video to gif');
} catch (err) {
throw err;
}
}
private async uploadGifToCloudStorage(destFileName, uploadName): Promise<string> {
try {
console.info('Uploading gif to cloud storage');
const gifImage = await fs.promises.readFile(destFileName);
const cloudStorageInstance = Container.get(CloudStorageService);
const gifUrl = await cloudStorageInstance.uploadGif(gifImage, `gifs/${uploadName}`);
return gifUrl;
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
await this.jobService.updateJobById(job.id as any, { status: 'processing' });
console.info('Started Processing Job :', job.id);
const { video, formatExtension } = await this.downloadVideo({
youtubeId: job.youtubeId,
youtubeUrl: job.youtubeUrl,
});
const srcFileName = `./src/media/temp.${formatExtension}`;
const destFileName = `./src/media/temp.gif`;
video.on('progress', (chunkLength, downloaded, total) => {
let percent: any = downloaded / total;
percent = percent * 100;
progressBar.update(percent);
});
video.pipe(
fs
.createWriteStream(srcFileName)
.on('open', () => {
console.log('Downloading Video');
progressBar.start(100, 0);
})
.on('finish', async () => {
progressBar.stop();
console.info('Downloaded video for job ', job.id);
await this.convertToGIF({
startTime: job.startTime,
endTime: job.endTime,
srcFileName,
destFileName,
formatExtension,
});
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
await this.jobService.updateJobById(job.id as any, { status: 'done', gifUrl });
console.info(`Finished job ${job.id}, gif at ${gifUrl}`);
onSuccess();
})
.on('error', async () => {
progressBar.stop();
console.error('Failed to process job', job.id);
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
}),
);
} catch (err) {
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
throw err;
}
}
}
Вспомните, как мы использовали channel.prefetch(1)
, когда начали потребление из очереди
this.channel.prefetch(1);
Это позволяет убедиться, что каждый потребитель очереди получает только одно сообщение за раз. Это гарантирует, что нагрузка будет равномерно распределена между нашими потребителями, и всякий раз, когда потребитель освободится, он будет готов обрабатывать другие задачи.
Подробнее об этом можно прочитать в RabbitMQ Docs.
Это также означает, что если мы захотим масштабировать задачи на конвертацию / Service Worker-ы, мы можем добавить больше копий этого сервиса.
Подробнее об этом написано в заметке «Конкурирующие потребители».
Вот и всё для нашего Service Worker-а! Теперь мы можем начать копаться в клиентской части приложения.
Нам осталось рассмотреть, как реализовать Next.js Client, который будет отправлять запросы на конвертацию и просматривать сконвертированные GIF.
Клиентская часть нашего приложения очень проста, она должна делать только две вещи:
Предоставить интерфейс для создания запросов на конвертацию.
Предоставить страницу, которая будет опрашивать задачу на конвертацию GIF и просматривать сгенерированный GIF, когда задача выполнена.
Давайте сразу перейдём к первой задаче — созданию главной страницы.
Минимально эта страница должна иметь:
Поля ввода, содержащие:
URL видео
время начала GIF
время окончания GIF
Встроенный проигрыватель YouTube с превью выбранного видео и предварительным просмотром выбранного временного диапазона.
Две кнопки: для предварительного просмотра текущего выбора и для отправки текущего выбора для создания GIF.
Начнём с создания трёх необходимых полей ввода и их соответствующих состояний.
// pages/index.tsx
import React, { useState, useMemo } from 'react';
const Home: React.FC = () => {
const [youtubeUrl, setYoutubeUrl] = useState("");
const [startTime, setStartTime] = useState("");
const [endTime, setEndTime] = useState("");
const validYoutubeUrl = useMemo(() => {
const youtubeUrlRegex = /^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/;
return youtubeUrl.match(youtubeUrlRegex);
}, [youtubeUrl]);
return (
<>
<input
className={`input ${youtubeUrl === "" ? "is-dark" : validYoutubeUrl? "is-success": "is-danger" }`}
type="text"
placeholder="Youtube URL, eg: https://www.youtube.com/watch?v=I-QfPUz1es8"
value={youtubeUrl}
onChange={(e) => {
setYoutubeUrl(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="Start Second, eg: 38"
value={startTime}
onChange={(e) => {
setStartTime(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="End Second, eg: 72"
value={endTime}
onChange={(e) => {
setEndTime(e.target.value);
}}
/>
</>
)
}
Обратите внимание, что мы проверяем валидность URL-адреса с помощью регулярных выражений. Это не обязательно, но используется для обеспечения хорошей визуальной обратной связи, а также для условного отображения встроенного YouTube плеера позже, чтобы избежать показа пустого проигрывателя.
Теперь пришло время добавить встроенный YouTube проигрыватель.
Мы будем использовать проигрыватель из react-youtube
// pages/index.tsx
import React, { useState, useMemo } from 'react';
import YouTube from "react-youtube";
const Home: React.FC = () => {
// ...code from before
const [ytPlayer, setYtPlayer] = useState(null);
const ytVideoId = useMemo(() => {
return youtubeUrl.split("v=")[1]?.slice(0, 11);
}, [youtubeUrl]);
return (
<>
<div className="content">
{validYoutubeUrl ? (
<>
<h3>Preview</h3>
<YouTube
videoId={ytVideoId}
opts={{
playerVars: {
start: Number(startTime),
end: Number(endTime),
autoplay: 0,
},
}}
onReady={(e) => {
setYtPlayer(e.target);
}}
/>
</>
) : (
<h4>No Youtube Video Link Selected</h4>
)}
</div>
</>
)
}
Обратите внимание, что мы инициализировали состояние ytPlayer
объектом event target. Мы используем его позже, чтобы программно управлять плеером — в частности, когда будем добавлять кнопку предварительного просмотра.
Теперь пришло время добавить две кнопки: Preview и Generate.
Предварительный просмотр (Preview): Используется для воспроизведения выбранной части видео, чтобы дать пользователю представление о том, как будет выглядеть GIF.
Сгенерировать (Generate): Используется для отправки фактического запроса на конвертацию GIF, то есть запуска фактической конвертации.
// pages/index.tsx
import React, { useState } from 'react';
import axios from "axios";
import { useRouter } from "next/router";
const Home: React.FC = () => {
// ... code from before
const router = useRouter();
const [loading, setLoading] = useState(false);
const submitYoutubeVideo = async () => {
setLoading(true);
try {
const response = await axios.post(
`${process.env.NEXT_PUBLIC_BASE_URL}/api/v1/jobs`,
{
youtubeUrl,
startTime: Number(startTime),
endTime: Number(endTime),
},
{}
);
router.push(`/jobs/${response.data.id}`);
} catch (err) {
alert(err?.response?.data?.message || "Something went wrong");
}
setLoading(false);
};
return (
<>
<button
className="button is-black"
onClick={() => {
if (ytPlayer)
ytPlayer.loadVideoById({
videoId: ytVideoId,
startSeconds: Number(startTime),
endSeconds: Number(endTime),
});
}}
>
Preview
</button>
<button
className={`button is-black is-outlined ${loading ? "is-loading" : ""}`}
onClick={submitYoutubeVideo}
>
Generate GIF
</button>
</>
)
}
// pages/index.tsx
import axios from "axios";
import { useRouter } from "next/router";
import React, { useMemo, useState } from "react";
import YouTube from "react-youtube";
const Home: React.FC = () => {
const router = useRouter();
const [youtubeUrl, setYoutubeUrl] = useState("");
const [startTime, setStartTime] = useState("");
const [endTime, setEndTime] = useState("");
const [loading, setLoading] = useState(false);
const [ytPlayer, setYtPlayer] = useState(null);
const validYoutubeUrl = useMemo(() => {
const youtubeUrlRegex = /^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/;
return youtubeUrl.match(youtubeUrlRegex);
}, [youtubeUrl]);
const ytVideoId = useMemo(() => {
return youtubeUrl.split("v=")[1]?.slice(0, 11);
}, [youtubeUrl]);
const submitYoutubeVideo = async () => {
setLoading(true);
try {
const response = await axios.post(
`${process.env.NEXT_PUBLIC_BASE_URL}/api/v1/jobs`,
{
youtubeUrl,
startTime: Number(startTime),
endTime: Number(endTime),
},
{}
);
router.push(`/jobs/${response.data.id}`);
} catch (err) {
console.log(err);
alert(err?.response?.data?.message || "Something went wrong");
}
setLoading(false);
};
return (
<>
{validYoutubeUrl ? (
<>
<h3>Preview</h3>
<YouTube
videoId={ytVideoId}
opts={{
playerVars: {
start: Number(startTime),
end: Number(endTime),
autoplay: 0,
},
}}
onReady={(e) => {
setYtPlayer(e.target);
}}
/>
</>
) : (
<h4>No Youtube Video Link Selected</h4>
)}
<input
className={`input ${youtubeUrl === ""? "is-dark": validYoutubeUrl? "is-success": "is-danger"}`}
type="text"
placeholder="Youtube URL, eg: https://www.youtube.com/watch?v=I-QfPUz1es8"
value={youtubeUrl}
onChange={(e) => {
setYoutubeUrl(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="Start Second, eg: 38"
value={startTime}
onChange={(e) => {
setStartTime(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="End Second, eg: 72"
value={endTime}
onChange={(e) => {
setEndTime(e.target.value);
}}
/>
<button
className={`button is-black`}
onClick={() => {
if (ytPlayer)
ytPlayer.loadVideoById({
videoId: ytVideoId,
startSeconds: Number(startTime),
endSeconds: Number(endTime),
});
}}
>
Preview
</button>
<button
className={`button is-black is-outlined ${loading ? "is-loading" : ""}`}
onClick={submitYoutubeVideo}
>
Generate GIF
</button>
</>
);
};
export default Home;
Здесь мы хотим периодически получать данные о задачи преобразования GIF из бэкенда. Это известно как частые опросы (polling / поллинг).
Для этого мы будем использовать swr — библиотеку сбора данных для React. Она не обязательно используется для частого опроса, но у неё есть хороший API, который частый опрос поддерживает. Существуют и другие библиотеки для получения данных с похожими возможностями, в частности React Query. Также можно выполнять опрос с помощью axios (используя таймауты). Однако такие библиотеки сбора данных, как swr и React Query, имеют хуки для сбора данных — это улучшает опыт разработки, а также предоставляет другие возможности (например, кэширование).
Сначала мы должны предоставить функцию выборки данных:
import axios from "axios";
import Job from "../../common/interfaces/Job.interface";
export default async function fetchJobById(jobId: string): Promise<Job> {
try {
const response = await axios.get(
`${process.env.NEXT_PUBLIC_BASE_URL}/api/v1/jobs/${jobId}`
);
return response.data;
} catch (err) {
if (err.response?.status === 404) window.location.href = "/404";
throw err;
}
}
Затем можно использовать это вместе с swr для опроса задачи по преобразованию GIF:
// pages/jobs/[id].tsx
import { useRouter } from "next/router";
import React from "react";
import useSWR from "swr";
import Job from "../../lib/common/interfaces/Job.interface";
import fetchJobById from "../../lib/requests/fetchers/jobById";
export default function JobPage() {
const router = useRouter()
const { jobId } = router.query
const [jobDone, setJobDone] = React.useState(false);
const { data: job, error: errorJob, isValidating: isValidatingJob } = useSWR(
[`/api/jobs/${jobId}`, jobId],
async (url, jobId) => await fetchJobById(jobId),
{
initialData: null,
revalidateOnFocus: false,
// job will be polled from the backend every 2 seconds until its status change to 'done'
refreshInterval: jobDone ? 0 : 2000,
}
);
React.useEffect(() => {
if (job?.status === "done") setJobDone(true);
}, [job]);
const loadingJob = !job;
return (
<>
{/* rendering logic */}
</>
);
}
Обратите внимание, что в этом фрагменте кода refreshInterval
— это частота опроса данных с бэкенда. Мы использовали булево состояние, которое будет отслеживать статус задачи, и как только она будет выполнена, мы прекратим частый опрос бэкенда.
Мы можем использовать рендеринг Next на стороне сервера для динамического получения ID из URL, а также для первоначальной выборки задачи один раз перед загрузкой страницы.
Для этого мы используем getServerSideProps()
Подробнее об этом смотрите в документации Next.js.
// pages/jobs/[id].tsx
// ...other imports
import { InferGetServerSidePropsType } from "next";
export const getServerSideProps = async (context) => {
const jobId = context.params.id;
try {
const initialJob: Job = await fetchJobById(jobId);
return { props: { jobId, initialJob: initialJob } };
} catch (err) {
return { props: { jobId, initialJob: null } };
}
};
export default function JobPage({
jobId,
initialJob,
}: InferGetServerSidePropsType<typeof getServerSideProps>) {
//...other code
const { data: job, error: errorJob, isValidating: isValidatingJob } = useSWR(
[`/api/jobs/${jobId}`, jobId],
async (url, jobId) => await fetchJobById(jobId),
{
// use initialJob instead of null
initialData: initialJob,
revalidateOnFocus: false,
refreshInterval: jobDone ? 0 : 2000,
}
);
return (
<>
{/* rendering logic */}
</>
);
}
Обратите внимание, что мы использовали initialJob
в свойстве initialData
в swr опциях.
// pages/jobs/[id].tsx
import { InferGetServerSidePropsType } from "next";
import React from "react";
import useSWR from "swr";
import Job from "../../lib/common/interfaces/Job.interface";
import fetchJobById from "../../lib/requests/fetchers/jobById";
export default function JobPage({
jobId,
initialJob,
}: InferGetServerSidePropsType<typeof getServerSideProps>) {
const [jobDone, setJobDone] = React.useState(false);
const { data: job, error: errorJob, isValidating: isValidatingJob } = useSWR(
[`/api/jobs/${jobId}`, jobId],
async (url, jobId) => await fetchJobById(jobId),
{
initialData: initialJob,
revalidateOnFocus: false,
refreshInterval: jobDone ? 0 : 2000,
}
);
React.useEffect(() => {
if (job?.status === "done") setJobDone(true);
}, [job]);
const loadingJob = !job;
return (
<>
{loadingJob ? (
<>
<h4>Getting conversion status..</h4>
<progress className="progress is-medium is-dark" max="100">
45%
</progress>
</>
) : (
<div className="content">
{job.status === "error" ? (
<h4 style={{ color: "#FF0000" }}>Conversion Failed</h4>
) : job.status === "done" ? (
<>
{!job.gifUrl ? (
<h4 style={{ color: "#FF0000" }}>Conversion Failed</h4>
) : (
<>
<h4>Gif</h4>
<img src={job.gifUrl}></img>
<h6>
GIF Url : <a href={job.gifUrl}>{job.gifUrl}</a>
</h6>
<h6>
Converted from :
<a href={job.youtubeUrl}>{job.youtubeUrl}</a>
</h6>
</>
)}
</>
) : (
<>
<h4>Working..</h4>
<h5>Conversion Status : {job.status}</h5>
<progress className="progress is-medium is-dark" max="100">
45%
</progress>
</>
)}
</div>
)}
</>
);
}
export const getServerSideProps = async (context) => {
const jobId = context.params.id;
try {
const initialJob: Job = await fetchJobById(jobId);
return { props: { jobId, initialJob: initialJob } };
} catch (err) {
return { props: { jobId, initialJob: null } };
}
};
На этом туториал завершаем и надеемся, что вы узнали что-то новое. Полный исходный код можно посмотреть в github репозитории.
В заключение приглашаем всех желающих на открытый урок "React Testing Library", который пройдёт сегодня вечером. На нём научимся:
Применять на практике подход к разработке компонентов на React.js через тестирование. Писать модульные тесты для компонентов, контролировать корректность обработки событий и документировать техническое решение.
Упрощать рефакторинг, сокращать количество ошибок на проде и упрощать командную работу.
Использовать react testing library и jest. Описывать тестовые сценарии.
Записаться на урок можно на странице онлайн-курса "React.js Developer".