Очереди могут пригодиться для сглаживания пиков обработки для задач, обработка которых может занять время или блокировать цикл событий node.js.
Испытание
Допустим, у нас есть конечная точка, которая обрабатывает некоторые данные, что занимает более нескольких секунд; это могут быть сложные вычисления, обработка видео, перекодирование звука и т. д. Если несколько пользователей одновременно отправляют запросы к этой конечной точке, это может заблокировать цикл событий, и пользователи не получат своевременный ответ, а не получат постоянно загружаемую страницу.
Таким образом, один из способов решения этой проблемы — поддерживать очередь, которая может хранить поступающие запросы и обрабатывать их асинхронно.
Мы бы построили это в рамках NestJS, используя Redis для поддержки очередей через пакет @nestjs/bull
и сторонний модуль для мониторинга очередей, чтобы мы могли следить за состоянием наших очередей.
Предварительные условия
- Клиент NestJS установлен — https://docs.nestjs.com/cli/overview
- Докер установлен — https://www.docker.com/
Настраивать
1- Создайте новое приложение NestJS, используя CLI.
nest new testqueues # In this demo we have used yarn in the next step # where it asks for the package manager selection
2- Установить зависимости
Выполните приведенные ниже команды в папке приложения, т. е. testqueues
# Install nestjs bull package yarn add @nestjs/bull bull # Install bull-board package which will be used for queues monitoring yarn add @bull-board/api @bull-board/express @bull-board/nestjs
Докеризовать
Мы будем использовать Docker для запуска приложения, откроем каталог приложения в вашей любимой IDE, а затем создадим Dockerfile
со следующим содержимым:
FROM node:18-alpine # Set working directory WORKDIR /usr/src/app # Install NestJS globally RUN npm install -g @nestjs/cli # Copy package.json and package-lock.json to working directory COPY package*.json yarn.lock ./ # Install dependencies RUN yarn install --frozen-lockfile # Copy source code to working directory COPY . . # Expose the default port for the NestJS application EXPOSE 3000 # Start the NestJS application in development mode CMD ["npm", "run", "start:dev"]
Обратите внимание, что этот Dockerfile
подходит только для среды разработки
Теперь создайте файл docker-compose.yml
со следующим содержимым:
version: '3.9' services: app: build: context: ./ ports: - '3000:3000' volumes: - ./:/usr/src/app command: npm run start:dev redis: container_name: redis_testqueues image: 'redis:latest' ports: - '6379:6379'
С помощью приведенного выше файла компоновки мы создаем две службы; app
для нашего приложения Nestjs и redis
для запуска Redis с открытым портом 6379. Если какой-либо из портов недоступен на вашем компьютере, измените его здесь, а затем отразите это соответствующим образом в коде приложения.
Запустите команду ниже, чтобы запустить приложение:
docker-compose up
Давайте кодировать
Создайте файл src/constants.ts
с константой для хранения имени нашей очереди Redis:
export const REDIS_QUEUE_NAME = 'qdataprocess';
Затем импортируйте необходимые модули в src/app.module.ts
, добавьте следующее в массив импорта (который, скорее всего, сейчас будет пустым):
imports: [ BullModule.forRoot({ redis: { host: 'redis_testqueues', // use the service name from docker-compose if its different port: 6379, }, }), BullModule.registerQueue({ name: REDIS_QUEUE_NAME, }), BullBoardModule.forRoot({ route: '/queues', adapter: ExpressAdapter, // Or FastifyAdapter from `@bull-board/fastify` }), BullBoardModule.forFeature({ name: REDIS_QUEUE_NAME, adapter: BullAdapter, }), ],
Добавьте импорт для добавленных нами модулей:
import { ExpressAdapter } from '@bull-board/express'; import { BullBoardModule } from '@bull-board/nestjs'; import { BullModule } from '@nestjs/bull'; import { BullAdapter } from '@bull-board/api/bullAdapter'; import { REDIS_QUEUE_NAME } from './constants';
Теперь, когда необходимые модули импортированы, давайте обновим файл app.service.ts
и добавим метод для обработки заданий:
import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { REDIS_QUEUE_NAME } from './constants'; import { Queue } from 'bull'; @Injectable() export class AppService { constructor(@InjectQueue(REDIS_QUEUE_NAME) private queue: Queue) {} getHello(): string { return 'Hello World!'; } async processData() { return await this.queue.add( 'process_data', { custom_id: Math.floor(Math.random() * 10000000) }, { priority: 1 }, ); } }
В основном мы добавили метод processData()
, который впоследствии использует queue.add
, первый аргумент — это уникальное имя для каждого задания, а второй — данные; здесь мы просто отправляем несколько случайных чисел, чтобы продемонстрировать, как данные могут быть переданы, а затем получены позже потребителем этого задания (мы доберемся до этого чуть позже)
Теперь обновите файл app.controller.ts
, добавим маршрут /process
, который затем будет использовать наш сервисный метод:
import { Controller, Get } from '@nestjs/common'; import { AppService } from './app.service'; @Controller() export class AppController { constructor(private readonly appService: AppService) {} @Get() getHello(): string { return this.appService.getHello(); } @Get('process') async processData() { return this.appService.processData(); } }
Итак, теперь наше приложение способно получать задания на конечной точке /process
, мы добавим потребителя, который в основном обрабатывает полученные задания и обрабатывает их.
Добавьте файл process-data.consumer.ts
со следующим содержимым:
import { Processor, Process, OnQueueActive, OnQueueCompleted, } from '@nestjs/bull'; import { Job } from 'bull'; import { REDIS_QUEUE_NAME } from './constants'; import { Logger } from '@nestjs/common'; @Processor(REDIS_QUEUE_NAME) export class ProcessDataConsumer { @Process('process_data') async processData() { // Perform the job // This is just a sample long running process // will take between 5 to 10 seconds to finish await new Promise((resolve, reject) => { try { setTimeout( () => { resolve('Data processed'); }, 5000 + Math.floor(Math.random() * 5000), ); } catch (error) { reject(error); } }); return { done: true }; } @OnQueueActive() onActive(job: Job<unknown>) { // Log that job is starting Logger.log(`Starting job ${job.id} : ${job.data['custom_id']}`); } @OnQueueCompleted() onCompleted(job: Job<unknown>) { // Log job completion status Logger.log(`Job ${job.id} has been finished`); } }
Ключевые моменты, на которые следует обратить внимание:
@Processor(REDIS_QUEUE_NAME)
помечает наш текущий класс как потребителя зарегистрированной нами очереди Redis.process_data
внутри@Process(‘process_data’)
— это имя задания, которое мы определили в нашем сервисном методе.processData()
— это основная функция, которая отвечает за обработку наших данных. Это может быть что угодно, например отправка электронного письма, выполнение некоторых операций с данными, обработка видеокадров, создание миниатюр и т. д. Для демонстрационных целей мы просто создаем обещание, которое будет решено в течение от 5 до 10 секунд
Теперь нам нужно зарегистрировать этого потребителя, для этого отредактируйте файл app.module.ts
и добавьте ProcessDataConsumer
в массив поставщиков (не забудьте также импортировать потребителя):
providers: [AppService, ProcessDataConsumer],
Наконец, вы можете протестировать приложение, посетив http://localhost:3000/process
, где будет добавлено задание, несколько раз обновите страницу, чтобы зарегистрировать несколько заданий, а затем просмотрите сгенерированные журналы консоли, чтобы просмотреть ход выполнения заданий.
ИЛИ перейдите по адресу http://localhost:3000/queues
, чтобы просмотреть панель мониторинга, на которой будут перечислены все сведения о заданиях в очереди.
Чего мы достигли
- Создано демонстрационное приложение на основе NestJS для регистрации и использования заданий на основе очередей Redis.
- Dockerized приложение (с поддержкой горячей перезагрузки)
- Добавлена панель мониторинга для отслеживания всех заданий в наших очередях.
Полный код доступен по адресу — https://github.com/qaribhaider/nestjs-redis-queues.
Пожалуйста, напишите в комментариях свои мнения, отзывы или любую конкретную тему, которую вы хотите изучить.
Ваше здоровье!