Очереди могут пригодиться для сглаживания пиков обработки для задач, обработка которых может занять время или блокировать цикл событий node.js.

Испытание

Допустим, у нас есть конечная точка, которая обрабатывает некоторые данные, что занимает более нескольких секунд; это могут быть сложные вычисления, обработка видео, перекодирование звука и т. д. Если несколько пользователей одновременно отправляют запросы к этой конечной точке, это может заблокировать цикл событий, и пользователи не получат своевременный ответ, а не получат постоянно загружаемую страницу.

Таким образом, один из способов решения этой проблемы — поддерживать очередь, которая может хранить поступающие запросы и обрабатывать их асинхронно.

Мы бы построили это в рамках NestJS, используя Redis для поддержки очередей через пакет @nestjs/bull и сторонний модуль для мониторинга очередей, чтобы мы могли следить за состоянием наших очередей.

Предварительные условия

Настраивать

1- Создайте новое приложение NestJS, используя CLI.

nest new testqueues

# In this demo we have used yarn in the next step 
# where it asks for the package manager selection

2- Установить зависимости

Выполните приведенные ниже команды в папке приложения, т. е. testqueues

# Install nestjs bull package
yarn add @nestjs/bull bull 

# Install bull-board package which will be used for queues monitoring
yarn add @bull-board/api @bull-board/express @bull-board/nestjs 

Докеризовать

Мы будем использовать Docker для запуска приложения, откроем каталог приложения в вашей любимой IDE, а затем создадим Dockerfile со следующим содержимым:

FROM node:18-alpine

# Set working directory
WORKDIR /usr/src/app

# Install NestJS globally
RUN npm install -g @nestjs/cli

# Copy package.json and package-lock.json to working directory
COPY package*.json yarn.lock ./

# Install dependencies
RUN yarn install --frozen-lockfile

# Copy source code to working directory
COPY . .

# Expose the default port for the NestJS application
EXPOSE 3000

# Start the NestJS application in development mode
CMD ["npm", "run", "start:dev"]

Обратите внимание, что этот Dockerfile подходит только для среды разработки

Теперь создайте файл docker-compose.yml со следующим содержимым:

version: '3.9'
services:
  app:
    build:
      context: ./
    ports:
      - '3000:3000'
    volumes:
      - ./:/usr/src/app
    command: npm run start:dev
  redis:
    container_name: redis_testqueues
    image: 'redis:latest'
    ports:
      - '6379:6379'

С помощью приведенного выше файла компоновки мы создаем две службы; app для нашего приложения Nestjs и redis для запуска Redis с открытым портом 6379. Если какой-либо из портов недоступен на вашем компьютере, измените его здесь, а затем отразите это соответствующим образом в коде приложения.

Запустите команду ниже, чтобы запустить приложение:

docker-compose up

Давайте кодировать

Создайте файл src/constants.ts с константой для хранения имени нашей очереди Redis:

export const REDIS_QUEUE_NAME = 'qdataprocess';

Затем импортируйте необходимые модули в src/app.module.ts, добавьте следующее в массив импорта (который, скорее всего, сейчас будет пустым):

  imports: [
    BullModule.forRoot({
      redis: {
        host: 'redis_testqueues', // use the service name from docker-compose if its different
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: REDIS_QUEUE_NAME,
    }),
    BullBoardModule.forRoot({
      route: '/queues',
      adapter: ExpressAdapter, // Or FastifyAdapter from `@bull-board/fastify`
    }),
    BullBoardModule.forFeature({
      name: REDIS_QUEUE_NAME,
      adapter: BullAdapter,
    }),
  ],

Добавьте импорт для добавленных нами модулей:

import { ExpressAdapter } from '@bull-board/express';
import { BullBoardModule } from '@bull-board/nestjs';
import { BullModule } from '@nestjs/bull';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { REDIS_QUEUE_NAME } from './constants';

Теперь, когда необходимые модули импортированы, давайте обновим файл app.service.ts и добавим метод для обработки заданий:

import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { REDIS_QUEUE_NAME } from './constants';
import { Queue } from 'bull';

@Injectable()
export class AppService {
  constructor(@InjectQueue(REDIS_QUEUE_NAME) private queue: Queue) {}

  getHello(): string {
    return 'Hello World!';
  }

  async processData() {
    return await this.queue.add(
      'process_data',
      { custom_id: Math.floor(Math.random() * 10000000) },
      { priority: 1 },
    );
  }
}

В основном мы добавили метод processData(), который впоследствии использует queue.add, первый аргумент — это уникальное имя для каждого задания, а второй — данные; здесь мы просто отправляем несколько случайных чисел, чтобы продемонстрировать, как данные могут быть переданы, а затем получены позже потребителем этого задания (мы доберемся до этого чуть позже)

Теперь обновите файл app.controller.ts, добавим маршрут /process, который затем будет использовать наш сервисный метод:

import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Get()
  getHello(): string {
    return this.appService.getHello();
  }

  @Get('process')
  async processData() {
    return this.appService.processData();
  }
}

Итак, теперь наше приложение способно получать задания на конечной точке /process, мы добавим потребителя, который в основном обрабатывает полученные задания и обрабатывает их.

Добавьте файл process-data.consumer.ts со следующим содержимым:

import {
  Processor,
  Process,
  OnQueueActive,
  OnQueueCompleted,
} from '@nestjs/bull';
import { Job } from 'bull';
import { REDIS_QUEUE_NAME } from './constants';
import { Logger } from '@nestjs/common';

@Processor(REDIS_QUEUE_NAME)
export class ProcessDataConsumer {
  @Process('process_data')
  async processData() {
    // Perform the job
    // This is just a sample long running process
    // will take between 5 to 10 seconds to finish
    await new Promise((resolve, reject) => {
      try {
        setTimeout(
          () => {
            resolve('Data processed');
          },
          5000 + Math.floor(Math.random() * 5000),
        );
      } catch (error) {
        reject(error);
      }
    });

    return { done: true };
  }

  @OnQueueActive()
  onActive(job: Job<unknown>) {
    // Log that job is starting
    Logger.log(`Starting job ${job.id} : ${job.data['custom_id']}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job<unknown>) {
    // Log job completion status
    Logger.log(`Job ${job.id} has been finished`);
  }
}

Ключевые моменты, на которые следует обратить внимание:

  • @Processor(REDIS_QUEUE_NAME) помечает наш текущий класс как потребителя зарегистрированной нами очереди Redis.
  • process_data внутри @Process(‘process_data’) — это имя задания, которое мы определили в нашем сервисном методе.
  • processData() — это основная функция, которая отвечает за обработку наших данных. Это может быть что угодно, например отправка электронного письма, выполнение некоторых операций с данными, обработка видеокадров, создание миниатюр и т. д. Для демонстрационных целей мы просто создаем обещание, которое будет решено в течение от 5 до 10 секунд

Теперь нам нужно зарегистрировать этого потребителя, для этого отредактируйте файл app.module.ts и добавьте ProcessDataConsumer в массив поставщиков (не забудьте также импортировать потребителя):

providers: [AppService, ProcessDataConsumer],

Наконец, вы можете протестировать приложение, посетив http://localhost:3000/process, где будет добавлено задание, несколько раз обновите страницу, чтобы зарегистрировать несколько заданий, а затем просмотрите сгенерированные журналы консоли, чтобы просмотреть ход выполнения заданий.

ИЛИ перейдите по адресу http://localhost:3000/queues, чтобы просмотреть панель мониторинга, на которой будут перечислены все сведения о заданиях в очереди.

Чего мы достигли

  • Создано демонстрационное приложение на основе NestJS для регистрации и использования заданий на основе очередей Redis.
  • Dockerized приложение (с поддержкой горячей перезагрузки)
  • Добавлена ​​панель мониторинга для отслеживания всех заданий в наших очередях.

Полный код доступен по адресу — https://github.com/qaribhaider/nestjs-redis-queues.

Пожалуйста, напишите в комментариях свои мнения, отзывы или любую конкретную тему, которую вы хотите изучить.

Ваше здоровье!