Поиск по сайту:

Как обрабатывать асинхронные задачи с помощью Node.js и BullMQ


Автор выбрал программу Write for DOnations.

Введение

В веб-приложениях есть поток Node.js, пока задача не будет завершена. Это может занять несколько секунд или минут. Пользователи должны дождаться завершения задачи, чтобы получить ответ от сервера.

Чтобы избежать замедления цикла запроса/ответа, вы можете использовать рабочий процесс bullmq, а затем удалять из очереди и выполнять каждое задание в очереди, помечая его как завершенное после выполнения.

В этой статье вы будете использовать bullmq, чтобы перевести трудоемкую задачу в фоновый режим, что позволит приложению быстро реагировать на запросы пользователей. Во-первых, вы создадите приложение с трудоемкой задачей без использования bullmq. Затем вы будете использовать bullmq для асинхронного выполнения задачи. Наконец, вы установите визуальную панель управления для управления заданиями bullmq в очереди Redis.

Предпосылки

Чтобы следовать этому руководству, вам понадобится следующее:

  • Настроена среда разработки Node.js. Для Ubuntu 22.04 следуйте нашему руководству по установке Node.js и созданию локальной среды разработки.
  • Redis установлен в вашей системе. В Ubuntu 22 выполните шаги с 1 по 3 в нашем руководстве «Как установить и защитить Redis».
  • Знакомство с циклом обработки событий, обратными вызовами, промисами и Async/Await в JavaScript.
  • Базовые знания о том, как использовать Как начать работу с Node.js и Express.
  • Знакомство с How To Use EJS to Template Your Node Application для более подробной информации.
  • Базовое понимание того, как обрабатывать изображения с помощью Как обрабатывать изображения в Node.js с помощью Sharp.

Шаг 1 — Настройка каталога проекта

На этом шаге вы создадите каталог и установите необходимые зависимости для вашего приложения. Приложение, которое вы создадите в этом руководстве, позволит пользователям загружать изображение, которое затем обрабатывается с помощью пакета sharp. Обработка изображений занимает много времени и может замедлить цикл запроса/ответа, что делает задачу хорошим кандидатом для того, чтобы bullmq переместился в фоновый режим. Техника, которую вы будете использовать для разгрузки задачи, также будет работать для других трудоемких задач.

Для начала создайте каталог с именем image_processor и перейдите в него:

  1. mkdir image_processor && cd image_processor

Затем инициализируйте каталог как пакет npm:

  1. npm init -y

Команда создает файл package.json. Параметр -y указывает npm принять все значения по умолчанию.

После запуска команды ваш вывод будет соответствовать следующему:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

Вывод подтверждает, что файл package.json создан. Важные свойства включают имя вашего приложения (name), номер версии вашего приложения (version) и начальную точку вашего проекта (main). ). Если вы хотите узнать больше о других свойствах, вы можете просмотреть документацию package.json npm.

Приложение, которое вы создадите в этом руководстве, потребует следующих зависимостей:

  • express: веб-платформа для создания веб-приложений.
  • express-fileupload: промежуточное ПО, которое позволяет вашим формам загружать файлы.
  • sharp: библиотека для обработки изображений.
  • ejs: язык шаблонов, позволяющий создавать HTML-разметку с помощью Node.js.
  • bullmq: распределенная очередь задач.
  • bull-board: информационная панель, основанная на bullmq и отображающая статус заданий с помощью приятного пользовательского интерфейса.

Чтобы установить все эти зависимости, выполните следующую команду:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

В дополнение к установленным вами зависимостям вы также будете использовать следующее изображение позже в этом руководстве:

Используйте curl, чтобы загрузить изображение в выбранное вами место на локальном компьютере.

  1. curl -O https://deved-images.nyc3.digitaloceanspaces.com/CART-68886/underwater.png

У вас есть необходимые зависимости для создания приложения Node.js без bullmq, что вы и сделаете дальше.

Шаг 2 — Реализация трудоемкой задачи без bullmq

На этом этапе вы создадите приложение с помощью Express, которое позволит пользователям загружать изображения. Приложение запустит трудоемкую задачу, используя sharp, чтобы изменить размер изображения на несколько размеров, которые затем отображаются пользователю после отправки ответа. Этот шаг поможет вам понять, как трудоемкие задачи влияют на цикл запроса/ответа.

С помощью nano или предпочитаемого вами текстового редактора создайте файл index.js:

  1. nano index.js

В файле index.js добавьте следующий код для импорта зависимостей:

const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

В первой строке вы импортируете модуль path для вычисления путей к файлам с помощью Node. Во второй строке вы импортируете модуль fs для взаимодействия с каталогами. Затем вы импортируете веб-фреймворк express. Вы импортируете модуль body-parser, чтобы добавить промежуточное ПО для анализа данных в HTTP-запросах. После этого вы импортируете модуль sharp для обработки изображений. Наконец, вы импортируете express-fileupload для обработки загрузок из HTML-формы.

Затем добавьте следующий код для реализации промежуточного ПО в вашем приложении:

...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

Во-первых, вы устанавливаете переменную app в экземпляр Express. Во-вторых, используя переменную app, метод set() настраивает Express для использования языка шаблонов ejs. Затем вы добавляете промежуточное ПО модуля body-parser с методом use() для преобразования данных JSON в HTTP-запросах в переменные, к которым можно получить доступ с помощью JavaScript. В следующей строке вы делаете то же самое с вводом в кодировке URL.

Затем добавьте следующие строки, чтобы добавить дополнительное ПО промежуточного слоя для обработки загрузки файлов и обслуживания статических файлов:

...
app.use(fileUpload());
app.use(express.static("public"));

Вы добавляете промежуточное ПО для анализа загруженных файлов, вызывая метод fileUpload(), и устанавливаете каталог, в котором Express будет просматривать и обслуживать статические файлы, такие как изображения и CSS.

С установленным промежуточным программным обеспечением создайте маршрут, который отображает HTML-форму для загрузки изображения:

...
app.get("/", function (req, res) {
  res.render("form");
});

Здесь вы используете метод get() модуля Express, чтобы указать маршрут / и обратный вызов, который должен выполняться, когда пользователь посещает домашнюю страницу или / маршрут. В обратном вызове вы вызываете res.render() для отображения файла form.ejs в каталоге views. Вы еще не создали файл form.ejs или каталог views.

Чтобы создать его, сначала сохраните и закройте файл. В терминале введите следующую команду, чтобы создать каталог views в корневом каталоге вашего проекта:

  1. mkdir views

Перейдите в каталог views:

  1. cd views

Создайте файл form.ejs в своем редакторе:

  1. nano form.ejs

В файле form.ejs добавьте следующий код для создания формы:

<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

Сначала вы ссылаетесь на файл head.ejs, который еще не создали. Файл head.ejs будет содержать HTML-элемент head, на который можно ссылаться на других HTML-страницах.

В теге body вы создаете форму со следующими атрибутами:

  • action указывает маршрут, по которому данные формы должны отправляться при отправке формы.
  • method указывает метод HTTP для отправки данных. Метод POST встраивает данные в HTTP-запрос.
  • encytype определяет способ кодирования данных формы. Значение multipart/form-data позволяет HTML-элементам input загружать данные файла.

В элементе form вы создаете тег input для загрузки файлов. Затем вы определяете элемент button с атрибутом type, для которого задано значение submit, что позволяет отправлять формы.

После завершения сохраните и закройте файл.

Затем создайте файл head.ejs:

  1. nano head.ejs

В файле head.ejs добавьте следующий код, чтобы создать раздел заголовка приложения:

<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

Здесь вы ссылаетесь на файл main.css, который вы создадите в каталоге public позже на этом шаге. Этот файл будет содержать стили для этого приложения. А пока вы продолжите настройку процессов для статических ресурсов.

Сохраните и закройте файл.

Для обработки данных, отправленных из формы, вы должны определить метод post в Express. Для этого вернитесь в корневой каталог вашего проекта:

  1. cd ..

Снова откройте файл index.js:

  1. nano index.js

В файле index.js добавьте выделенные строки, чтобы определить метод обработки отправки форм по маршруту /upload:

app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

Вы используете переменную app для вызова метода post(), который будет обрабатывать отправленную форму на маршруте /upload. Затем вы извлекаете данные загруженного изображения из HTTP-запроса в переменную image. После этого вы устанавливаете ответ для возврата кода состояния 400, если пользователь не загружает изображение.

Чтобы настроить процесс для загруженного изображения, добавьте следующий выделенный код:

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

Эти строки показывают, как ваше приложение будет обрабатывать изображение. Сначала вы удаляете расширение изображения из загруженного изображения и сохраняете имя в переменной imageName. Затем вы определяете функцию processImage(). Эта функция принимает параметр size, значение которого будет использоваться для определения размеров изображения при изменении размера. В функции вы вызываете sharp() с image.data, который является форматом изображения webp. Затем вы сохраняете изображение в каталоге public/images/.

Последующий список чисел определяет размеры, которые будут использоваться для изменения размера загруженного изображения. Затем вы используете метод JavaScript map() для вызова processImage() для каждого элемента в массиве sizes, после чего он возвращает новый массив . Каждый раз, когда метод map() вызывает функцию processImage(), он возвращает обещание для нового массива. Вы используете метод Promise.all() для их разрешения.

Скорость компьютерной обработки различается, как и размер изображений, которые пользователь может загрузить, что может повлиять на скорость обработки изображений. Чтобы отложить этот код в демонстрационных целях, вставьте выделенные строки, чтобы добавить цикл приращения с интенсивным использованием ЦП и перенаправление на страницу, которая будет отображать изображения с измененным размером с выделенными строками:

...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

Цикл будет выполняться 10 миллиардов раз, чтобы увеличить переменную counter. Вы вызываете функцию res.redirect(), чтобы перенаправить приложение на маршрут /result. Маршрут отобразит HTML-страницу, на которой будут отображаться изображения из каталога public/images.

Маршрут /result еще не существует. Чтобы создать его, добавьте выделенный код в файл index.js:

...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

Вы определяете маршрут /result с помощью метода app.get(). В функции вы определяете переменную imgDirPath с полным путем к каталогу public/images. Вы используете метод readdirSync() модуля fs для чтения всех файлов в данном каталоге. Оттуда вы связываете метод map(), чтобы вернуть новый массив с путями к изображениям с префиксом images/.

Наконец, вы вызываете res.render() для рендеринга файла result.ejs, которого еще не существует. Вы передаете переменную imgFiles, содержащую массив всех относительных путей изображения, в файл result.ejs.

Сохраните и закройте файл.

Чтобы создать файл result.ejs, вернитесь в каталог views:

  1. cd views

Создайте и откройте файл result.ejs в своем редакторе:

  1. nano result.ejs

В файле result.ejs добавьте следующие строки для отображения изображений:

<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

Сначала вы ссылаетесь на файл head.ejs. В теге body вы проверяете, пуста ли переменная imgFiles. Если у него есть данные, вы перебираете каждый файл и создаете изображение для каждого элемента массива. Если imgFiles пуст, вы печатаете сообщение, в котором пользователю предлагается Обновить через несколько секунд, чтобы просмотреть изображения с измененным размером..

Сохраните и закройте файл.

Затем вернитесь в корневой каталог и создайте каталог public, который будет содержать ваши статические ресурсы:

  1. cd .. && mkdir public

Перейдите в каталог public:

  1. cd public

Создайте каталог images, в котором будут храниться загруженные изображения:

  1. mkdir images

Затем создайте каталог css и перейдите к нему:

  1. mkdir css && cd css

В редакторе создайте и откройте файл main.css, на который вы ссылались ранее в файле head.ejs:

  1. nano main.css

В файле main.css добавьте следующие стили:

body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Styles for the "Choose File"  button **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Styles for the "Upload Image"  button **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

Эти строки будут стилизовать элементы в приложении. Используя HTML-атрибуты, вы определяете стиль фона кнопки «Выбрать файл» с помощью шестнадцатеричного кода #2196f3 (оттенок синего) и границы кнопки «Загрузить изображение» в оранжевый. Вы также стилизуете элементы на маршруте /result, чтобы сделать их более презентабельными.

После завершения сохраните и закройте файл.

Вернитесь в корневой каталог проекта:

  1. cd ../..

Откройте index.js в своем редакторе:

  1. nano index.js

В свой index.js добавьте следующий код, который запустит сервер:

...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Полный файл index.js теперь будет соответствовать следующему:

const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

Когда вы закончите вносить изменения, сохраните и закройте файл.

Запустите приложение с помощью команды node:

  1. node index.js

Вы получите такой вывод:

Output
Server running on port 3000

Этот вывод подтверждает, что сервер работает без проблем.

Откройте предпочитаемый браузер и посетите http://localhost:3000/.

Примечание. Если вы выполняете руководство на удаленном сервере, вы можете получить доступ к приложению в локальном браузере с помощью переадресации портов.

Пока сервер Node.js работает, откройте другой терминал и введите следующую команду:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

После подключения к серверу запустите node index.js, а затем перейдите к http://localhost:3000/ в веб-браузере вашего локального компьютера.

Когда страница загрузится, она будет соответствовать следующему:

Затем нажмите кнопку «Выбрать файл» и выберите изображение underwater.png на локальном компьютере. Отображение переключится с «Файл не выбран» на «подводный файл.png». После этого нажмите кнопку Загрузить изображение. Приложение будет загружаться некоторое время, поскольку оно обрабатывает изображение и запускает цикл увеличения.

После завершения задачи маршрут /result загрузится с изображениями измененного размера:

Теперь вы можете остановить сервер с помощью CTRL+C. Node.js не перезагружает сервер автоматически при изменении файлов, поэтому вам нужно будет останавливать и перезапускать сервер всякий раз, когда вы обновляете файлы.

Теперь вы знаете, как трудоемкая задача может повлиять на цикл запроса/ответа приложения. Далее вы будете выполнять задачу асинхронно.

Шаг 3 — Асинхронное выполнение трудоемких задач с помощью bullmq

На этом шаге вы перенесете трудоемкую задачу в фоновый режим с помощью bullmq. Эта корректировка освободит цикл запроса/ответа и позволит вашему приложению немедленно отвечать пользователям во время обработки изображения.

Для этого вам нужно создать краткое описание задания и добавить его в очередь с помощью bullmq. В процессе первым поступил - первым обслужен (FIFO) первый элемент, добавленный в очередь, является первым элементом, подлежащим удалению (удаление из очереди). С помощью bullmq производитель добавит задание в очередь, а потребитель (или работник) удалит задание из очереди и выполнить его.

Очередь в bullmq находится в Redis. Когда вы описываете задание и добавляете его в очередь, запись для задания создается в очереди Redis. Описание задания может быть строкой или объектом со свойствами, содержащими минимальные данные или ссылки на данные, которые позволят bullmq выполнить задание позже. Как только вы определите функциональность для добавления заданий в очередь, вы переместите трудоемкий код в отдельную функцию. Позже bullmq вызовет эту функцию с данными, которые вы сохранили в очереди, когда задание будет удалено из очереди. Как только задача завершится, bullmq пометит ее как выполненную, достанет еще одну задачу из очереди и выполнит ее.

Откройте index.js в своем редакторе:

  1. nano index.js

В файле index.js добавьте выделенные строки, чтобы создать очередь в Redis с помощью bullmq:

...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Вы начинаете с извлечения класса Queue из bullmq, который используется для создания очереди в Redis. Затем вы задаете для переменной redisOptions объект со свойствами, которые экземпляр класса Queue будет использовать для установления соединения с Redis. Вы устанавливаете для свойства host значение localhost, поскольку Redis работает на вашем локальном компьютере.

Примечание. Если Redis работает на удаленном сервере отдельно от вашего приложения, вы должны обновить значение свойства host, указав IP-адрес удаленного сервера. Вы также устанавливаете для свойства port значение 6379 — порт по умолчанию, который Redis использует для прослушивания подключений.

Если вы настроили переадресацию портов на удаленный сервер, на котором запущены Redis и приложение вместе, вам не нужно обновлять свойство host, но вам нужно будет использовать соединение для переадресации портов каждый раз, когда вы входите в систему. на ваш сервер для запуска приложения.

Затем вы устанавливаете переменную imageJobQueue в экземпляр класса Queue, принимая имя очереди в качестве первого аргумента и объект в качестве второго аргумента. Объект имеет свойство connection со значением, установленным для объекта в переменной redisOptions. После создания экземпляра класса Queue в Redis будет создана очередь с именем imageJobQueue.

Наконец, вы определяете функцию addJob(), которую будете использовать для добавления задания в imageJobQueue. Функция принимает параметр job, содержащий информацию о задании u2060 (вы вызовете функцию addJob() с данными, которые хотите сохранить в очереди). В этой функции вы вызываете метод add() очереди imageJobQueue, принимая имя задания в качестве первого аргумента и данные задания в качестве второго аргумента.

Добавьте выделенный код для вызова функции addJob() для добавления задания в очередь:

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Здесь вы вызываете функцию addJob() с объектом, описывающим задание. Объект имеет атрибут type со значением имени задания. Второе свойство, image, задается для объекта, содержащего данные изображения, загруженные пользователем. Поскольку данные изображения в image.data находятся в буфере (двоичной форме), вы вызываете метод JavaScript toString(), чтобы преобразовать их в строку, которую можно сохранить в Redis. , который в результате установит свойство data. Свойству image присваивается имя загруженного изображения (включая расширение изображения).

Теперь вы определили информацию, необходимую bullmq для выполнения этого задания позже. В зависимости от вашей работы вы можете добавить больше информации о работе или меньше.

Предупреждение. Поскольку Redis — это база данных в памяти, избегайте хранения больших объемов данных для заданий в очереди. Если у вас есть большой файл, который необходимо обработать для задания, сохраните файл на диске или в облаке, а затем сохраните ссылку на файл в виде строки в очереди. Когда bullmq выполняет задание, он получает файл по ссылке, сохраненной в Redis.

Сохраните и закройте файл.

Затем создайте и откройте файл utils.js, который будет содержать код обработки изображения:

  1. nano utils.js

В файле utils.js добавьте следующий код, чтобы определить функцию обработки изображения:

const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

Вы импортируете модули, необходимые для обработки изображений и вычисления путей в первых двух строках. Затем вы определяете функцию processUploadedImages(), которая будет содержать трудоемкую задачу обработки изображений. Эта функция принимает параметр job, который будет заполнен, когда рабочий процесс извлекает данные задания из очереди, а затем вызывает функцию processUploadedImages() с данными очереди. Вы также экспортируете функцию processUploadedImages(), чтобы на нее можно было ссылаться в других файлах.

Сохраните и закройте файл.

Вернитесь к файлу index.js:

  1. nano index.js

Скопируйте выделенные строки из файла index.js, затем удалите их из этого файла. Вам понадобится скопированный код на мгновение, поэтому сохраните его в буфер обмена. Если вы используете nano, вы можете выделить эти строки и щелкнуть правой кнопкой мыши, чтобы скопировать строки:

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

Метод post для маршрута upload теперь будет соответствовать следующему:

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

Сохраните и закройте этот файл, затем откройте файл utils.js:

  1. nano utils.js

В файле utils.js вставьте только что скопированные строки для обратного вызова маршрута /upload в функцию processUploadedImages:

...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

Теперь, когда вы переместили код для обработки изображения, вам нужно обновить его, чтобы использовать данные изображения из параметра job функции processUploadedImages(), которую вы определили ранее.

Для этого добавьте и обновите выделенные строки ниже:


function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

Вы преобразуете строковую версию данных изображения обратно в двоичную с помощью метода Buffer.from(). Затем вы обновляете path.parse() ссылкой на имя изображения, сохраненное в очереди. После этого вы обновляете метод sharp(), чтобы он брал двоичные данные изображения, хранящиеся в переменной imageFileData.

Полный файл utils.js теперь будет соответствовать следующему:

const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

Сохраните и закройте файл, затем вернитесь к index.js:

  1. nano index.js

Переменная sharp больше не нужна как зависимость, так как изображение теперь обрабатывается в файле utils.js. Удалите выделенную строку из файла:

const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

Сохраните и закройте файл.

Теперь вы определили функциональность для создания очереди в Redis и добавления задания. Вы также определили функцию processUploadedImages() для обработки загруженных изображений.

Осталось создать потребителя (или работника), который будет извлекать задание из очереди и вызывать функцию processUploadedImages() с параметром данные о работе.

Создайте в редакторе файл worker.js:

  1. nano worker.js

В файле worker.js добавьте следующий код:

const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

В первой строке вы импортируете класс Worker из bullmq; при создании экземпляра запускается рабочий процесс, который удаляет задания из очереди в Redis и выполняет их. Затем вы ссылаетесь на функцию processUploadedImages() из файла utils.js, чтобы рабочий процесс мог вызвать функцию с данными в очереди.

Вы определяете функцию workerHandler(), которая принимает параметр job, содержащий данные задания в очереди. В функции вы регистрируете запуск задания, а затем вызываете processUploadedImages() с данными задания. После этого вы регистрируете сообщение об успехе и возвращаете null.

Чтобы разрешить рабочему процессу подключаться к Redis, удалять задание из очереди и вызывать workerHandler() с данными задания, добавьте в файл следующие строки:

...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

Здесь вы устанавливаете переменную workerOptions для объекта, содержащего настройки подключения Redis. Вы устанавливаете переменную worker в экземпляр класса Worker, который принимает следующие параметры:

  • imageJobQueue: имя очереди заданий.
  • workerHandler: функция, которая запускается после удаления задания из очереди Redis.
  • workerOptions: параметры конфигурации Redis, которые рабочий процесс использует для установления соединения с Redis.

Наконец, вы регистрируете сообщение об успехе.

После добавления строк сохраните и закройте файл.

Теперь вы определили функциональные возможности рабочего процесса bullmq для удаления заданий из очереди и их выполнения.

В своем терминале удалите изображения из каталога public/images, чтобы вы могли начать заново для тестирования своего приложения:

  1. rm public/images/*

Затем запустите файл index.js:

  1. node index.js

Приложение запустится:

Output
Server running on port 3000

Теперь вы запустите рабочего. Откройте второй сеанс терминала и перейдите непосредственно к проекту:

  1. cd image_processor/

Запустите воркер с помощью следующей команды:

  1. node worker.js

Рабочий начнет:

Output
Worker started!

Посетите http://localhost:3000/ в своем браузере. Нажмите кнопку «Выбрать файл» и выберите underwater.png на своем компьютере, затем нажмите кнопку «Загрузить изображение».

Вы можете получить мгновенный ответ с предложением обновить страницу через несколько секунд:

Кроме того, вы можете получить мгновенный ответ с некоторыми обработанными изображениями на странице, в то время как другие все еще обрабатываются:

Вы можете обновить страницу несколько раз, чтобы загрузить все измененные изображения.

Вернитесь к терминалу, где работает ваш воркер. Этот терминал будет иметь сообщение, которое соответствует следующему:

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

Вывод подтверждает, что bullmq успешно выполнил задание.

Ваше приложение может по-прежнему разгружать ресурсоемкие задачи, даже если рабочий процесс не запущен. Чтобы продемонстрировать это, остановите рабочий процесс во втором терминале с помощью CTRL+C.

В первом сеансе терминала остановите сервер Express и удалите изображения из public/images:

  1. rm public/images/*

После этого снова запустите сервер:

  1. node index.js

В браузере откройте http://localhost:3000/ и снова загрузите изображение underwater.png. Когда вы будете перенаправлены на путь /result, изображения не будут отображаться на странице, потому что рабочий процесс не запущен:

Вернитесь в терминал, где вы запускали воркер, и снова запустите воркера:

  1. node worker.js

Вывод будет соответствовать следующему, что позволит вам узнать, что задание запущено:

Output
Worker started! Starting job: processUploadedImages

После того, как задание будет завершено и в выводе появится строка Finished job: processUploadedImages, обновите браузер. Теперь изображения будут загружаться:

Остановите сервер и рабочий процесс.

Теперь вы можете перевести трудоемкую задачу в фоновый режим и выполнить ее асинхронно с помощью bullmq. На следующем шаге вы настроите панель мониторинга для отслеживания состояния очереди.

Шаг 4 — Добавление панели инструментов для мониторинга очередей bullmq

На этом шаге вы будете использовать пакет bull-board для мониторинга заданий в очереди Redis с визуальной панели. Этот пакет автоматически создаст панель мониторинга пользовательского интерфейса (UI), которая отображает и упорядочивает информацию о заданиях bullmq, хранящихся в очереди Redis. С помощью браузера вы можете отслеживать завершенные, ожидающие или невыполненные задания, не открывая интерфейс командной строки Redis в терминале.

Откройте файл index.js в текстовом редакторе:

  1. nano index.js

Добавьте выделенный код для импорта bull-board:

...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

В предыдущем коде вы импортируете метод createBullBoard() из bull-board. Вы также импортируете BullMQAdapter, который позволяет bull-board получать доступ к очередям bullmq, и ExpressAdapter, который предоставляет функциональные возможности для Express для отображения приборной панели.

Затем добавьте выделенный код для подключения bull-board к bullmq:

...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

Сначала вы устанавливаете serverAdapter в экземпляр ExpressAdapter. Затем вы вызываете createBullBoard() для инициализации информационной панели с данными очереди bullmq. Вы передаете функции объектный аргумент со свойствами queues и serverAdapter. Первое свойство, queues, принимает массив очередей, определенных вами с помощью bullmq, который здесь является imageJobQueue. Второе свойство, serverAdapter, содержит объект, который принимает экземпляр серверного адаптера Express. После этого вы устанавливаете путь /admin для доступа к информационной панели с помощью метода setBasePath().

Затем добавьте промежуточное ПО serverAdapter для маршрута /admin:

app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

Полный файл index.js будет соответствовать следующему:

const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

После внесения изменений сохраните и закройте файл.

Запустите файл index.js:

  1. node index.js

Вернитесь в браузер и посетите http://localhost:3000/admin. Панель управления загрузится:

На панели мониторинга вы можете просмотреть тип задания, данные, которые оно потребляет, и дополнительную информацию о задании. Вы также можете переключаться на другие вкладки, например на вкладку «Завершено» для получения информации о завершенных заданиях, на вкладку «Сбой» для получения дополнительных сведений о невыполненных заданиях и на вкладку «Приостановлено» для получения дополнительных сведений о приостановленных заданиях.

Теперь вы можете использовать панель управления bull-board для мониторинга очередей.

Заключение

В этой статье вы перенесли трудоемкую задачу в очередь заданий с помощью bullmq. Во-первых, без использования bullmq вы создали приложение с трудоемкой задачей, которая имеет медленный цикл запроса/ответа. Затем вы использовали bullmq, чтобы разгрузить трудоемкую задачу и выполнить ее асинхронно, что увеличивает цикл запроса/ответа. После этого вы использовали bull-board для создания панели управления для мониторинга очередей bullmq в Redis.

Вы можете посетить документацию bull-board, чтобы узнать больше о функциях информационной панели.