Поиск по сайту:

Как использовать многопоточность в Node.js


Автор выбрал программу Write for DOnations.

Введение

Библиотека libuv, которая обрабатывает операции ввода-вывода, такие как чтение файлов с диска или сетевые запросы. Благодаря использованию скрытых потоков Node.js предоставляет асинхронные методы, которые позволяют вашему коду выполнять запросы ввода-вывода, не блокируя основной поток.

Хотя в Node.js есть скрытые потоки, их нельзя использовать для разгрузки ресурсоемких задач, таких как сложные вычисления, изменение размера изображения или сжатие видео. Поскольку JavaScript является однопоточным, когда выполняется задача с интенсивным использованием ЦП, он блокирует основной поток, и никакой другой код не выполняется до тех пор, пока задача не завершится. Единственный способ ускорить задачу, связанную с ЦП, без использования других потоков — увеличить скорость процессора.

Однако в последние годы процессоры не становятся быстрее. Вместо этого компьютеры поставляются с дополнительными ядрами, и теперь компьютеры чаще имеют 8 или более ядер. Несмотря на эту тенденцию, ваш код не будет использовать преимущества дополнительных ядер на вашем компьютере для ускорения задач, связанных с ЦП, или для предотвращения прерывания основного потока, поскольку JavaScript является однопоточным.

Чтобы исправить это, Node.js представил модуль worker-threads, который позволяет создавать потоки и выполнять несколько задач JavaScript параллельно. Как только поток завершает задачу, он отправляет в основной поток сообщение, содержащее результат операции, чтобы его можно было использовать с другими частями кода. Преимущество использования рабочих потоков заключается в том, что задачи, связанные с процессором, не блокируют основной поток, и вы можете разделить и распределить задачу между несколькими рабочими потоками для ее оптимизации.

В этом руководстве вы создадите приложение Node.js с задачей, интенсивно использующей ЦП, которая блокирует основной поток. Затем вы воспользуетесь модулем worker-threads, чтобы разгрузить задачу, интенсивно использующую ЦП, в другой поток, чтобы избежать блокировки основного потока. Наконец, вы разделите задачу, связанную с ЦП, и четыре потока будут работать над ней параллельно, чтобы ускорить задачу.

Предпосылки

Для выполнения этого урока вам понадобятся:

  • Многоядерная система с четырьмя или более ядрами. Вы по-прежнему можете следовать инструкциям с шагов 1 по 6 в двухъядерной системе. Однако для шага 7 требуется четыре ядра, чтобы увидеть улучшения производительности.
  • Среда разработки Node.js. Если вы используете Ubuntu 22.04, установите последнюю версию Node.js, выполнив шаг 3 раздела «Как установить Node.js и создать локальную среду разработки».
  • Хорошее понимание цикла событий, обратных вызовов и обещаний в JavaScript, которое вы можете найти в нашем руководстве, Понимание цикла событий, обратных вызовов, обещаний и Async/Await в JavaScript.
  • Базовые знания о том, как использовать веб-фреймворк Express. Ознакомьтесь с нашим руководством «Как начать работу с Node.js и Express».

Настройка проекта и установка зависимостей

На этом шаге вы создадите каталог проекта, инициализируете npm и установите все необходимые зависимости.

Для начала создайте и переместите в каталог проекта:

  1. mkdir multi-threading_demo
  2. cd multi-threading_demo

Команда mkdir создает каталог, а команда cd изменяет рабочий каталог на вновь созданный.

После этого инициализируйте каталог проекта с помощью npm с помощью команды npm init:

  1. npm init -y

Параметр -y принимает все параметры по умолчанию.

Когда команда запустится, ваш вывод будет выглядеть примерно так:

Wrote to /home/sammy/multi-threading_demo/package.json:

{
  "name": "multi-threading_demo",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Затем установите express, веб-фреймворк Node.js:

  1. npm install express

Вы будете использовать Express для создания серверного приложения с блокирующими и неблокирующими конечными точками.

Node.js поставляется с модулем worker-threads по умолчанию, поэтому вам не нужно его устанавливать.

Теперь вы установили необходимые пакеты. Далее вы узнаете больше о процессах и потоках и о том, как они выполняются на компьютере.

Понимание процессов и потоков

Прежде чем вы начнете писать задачи, привязанные к процессору, и разгружать их по отдельным потокам, вам сначала нужно понять, что такое процессы и потоки, и в чем разница между ними. Самое главное, вы увидите, как процессы и потоки выполняются в одноядерной или многоядерной компьютерной системе.

Процесс

Процесс — это работающая программа в операционной системе. Он имеет собственную память и не может ни видеть, ни обращаться к памяти других запущенных программ. Он также имеет указатель инструкции, который указывает на команду, выполняемую в данный момент в программе. Одновременно может выполняться только одна задача.

Чтобы понять это, вы создадите программу Node.js с бесконечным циклом, чтобы она не завершалась при запуске.

С помощью nano или предпочитаемого вами текстового редактора создайте и откройте файл process.js:

  1. nano process.js

В файле process.js введите следующий код:

const process_name = process.argv.slice(2)[0];

count = 0;
while (true) {
  count++;
  if (count == 2000 || count == 4000) {
    console.log(`${process_name}: ${count}`);
  }
}

В первой строке свойство process.argv возвращает массив, содержащий аргументы командной строки программы. Затем вы присоединяете метод JavaScript slice() с аргументом 2, чтобы сделать неглубокую копию массива, начиная с индекса 2. При этом пропускаются первые два аргумента: путь к Node.js и имя файла программы. Затем вы используете синтаксис скобочной записи для извлечения первого аргумента из массива срезов и сохранения его в переменной process_name.

После этого вы определяете цикл while и передаете ему условие true, чтобы цикл выполнялся вечно. Внутри цикла переменная count увеличивается на 1 во время каждой итерации. За ним следует оператор if, который проверяет, равен ли count 2000 или 4000. Если условие оценивается как истинное, метод console.log() регистрирует сообщение в терминале.

Сохраните и закройте файл с помощью CTRL+X, затем нажмите Y, чтобы сохранить изменения.

Запустите программу с помощью команды node:

  1. node process.js A &

A — это аргумент командной строки, который передается программе и сохраняется в переменной имя_процесса. & в конце позволяет программе Node работать в фоновом режиме, что позволяет вам вводить больше команд в оболочке.

Когда вы запустите программу, вы увидите вывод, подобный следующему:

Output
[1] 7754 A: 2000 A: 4000

Номер 7754 — это идентификатор процесса, присвоенный ему операционной системой. A: 2000 и A: 4000 — это выходные данные программы.

Когда вы запускаете программу с помощью команды node, вы создаете процесс. Операционная система выделяет память для программы, находит исполняемый файл программы на диске вашего компьютера и загружает программу в память. Затем он присваивает ему идентификатор процесса и начинает выполнение программы. В этот момент ваша программа стала процессом.

Когда процесс запущен, его идентификатор процесса добавляется в список процессов операционной системы, и его можно увидеть с помощью таких инструментов, как ps. Инструменты предоставляют более подробную информацию о процессах, а также варианты их остановки или приоритизации.

Чтобы получить краткую сводку о процессе Node, нажмите ENTER в своем терминале, чтобы вернуться к подсказке. Затем запустите команду ps, чтобы увидеть процессы Node:

  1. ps |grep node

Команда ps выводит список всех процессов, связанных с текущим пользователем в системе. Оператор канала | для передачи всего вывода ps в grep фильтрует процессы, чтобы отображать только процессы Node.

Выполнение команды приведет к выводу, подобному следующему:

Output
7754 pts/0 00:21:49 node

Вы можете создать бесчисленное количество процессов из одной программы. Например, используйте следующую команду, чтобы создать еще три процесса с разными аргументами и перевести их в фоновый режим:

  1. node process.js B & node process.js C & node process.js D &

В команде вы создали еще три экземпляра программы process.js. Символ & переводит каждый процесс в фоновый режим.

После выполнения команды вывод будет выглядеть примерно так (хотя порядок может отличаться):

Output
[2] 7821 [3] 7822 [4] 7823 D: 2000 D: 4000 B: 2000 B: 4000 C: 2000 C: 4000

Как видно из вывода, каждый процесс заносил в терминал имя процесса, когда счетчик достигал 2000 и 4000. Каждый процесс не знает о других запущенных процессах: процесс D не знает о процессе C, и наоборот. Все, что происходит в любом процессе, не повлияет на другие процессы Node.js.

Если вы внимательно изучите вывод, то увидите, что порядок вывода отличается от того, который был у вас при создании трех процессов. При выполнении команды аргументы процессов были в порядке B, C и D. Но теперь порядок следующий: D, B и C. Причина в том, что ОС имеет алгоритмы планирования, которые решают, какой процесс выполнять на ЦП в данный момент времени.

На одном ядре процессы выполняются одновременно. То есть операционная система переключается между процессами через равные промежутки времени. Например, процесс D выполняется в течение ограниченного времени, затем его состояние где-то сохраняется, и ОС планирует выполнение процесса B в течение ограниченного времени и так далее. Это происходит взад и вперед, пока все задачи не будут выполнены. На выходе может показаться, что каждый процесс завершен, но на самом деле планировщик ОС постоянно переключается между ними.

В многоядерной системе — при условии, что у вас четыре ядра — ОС планирует выполнение каждого процесса на каждом ядре одновременно. Это называется параллелизм. Однако, если вы создадите еще четыре процесса (доведя общее количество до восьми), каждое ядро будет выполнять два процесса одновременно, пока они не будут завершены.

Потоки

Потоки похожи на процессы: у них есть собственный указатель инструкций, и они могут выполнять одну задачу JavaScript за раз. В отличие от процессов потоки не имеют собственной памяти. Вместо этого они находятся в памяти процесса. Когда вы создаете процесс, он может иметь несколько потоков, созданных с помощью модуля worker_threads, выполняющего код JavaScript параллельно. Кроме того, потоки могут взаимодействовать друг с другом посредством передачи сообщений или совместного использования данных в памяти процесса. Это делает их легкими по сравнению с процессами, поскольку создание потока не требует дополнительной памяти от операционной системы.

Когда дело доходит до выполнения потоков, их поведение похоже на поведение процессов. Если у вас есть несколько потоков, работающих в одноядерной системе, операционная система будет переключаться между ними через равные промежутки времени, давая каждому потоку возможность выполняться непосредственно на одном процессоре. В многоядерной системе ОС распределяет потоки между всеми ядрами и одновременно выполняет код JavaScript. Если вы создадите больше потоков, чем доступно ядер, каждое ядро будет выполнять несколько потоков одновременно.

При этом нажмите ENTER, затем остановите все запущенные в данный момент процессы Node с помощью команды kill:

  1. sudo kill -9 `pgrep node`

pgrep возвращает идентификаторы всех четырех процессов Node в команду kill. Опция -9 указывает kill отправить сигнал SIGKILL.

Когда вы запустите команду, вы увидите вывод, подобный следующему:

Output
[1] Killed node process.js A [2] Killed node process.js B [3] Killed node process.js C [4] Killed node process.js D

Иногда вывод может задерживаться и отображаться позже, когда вы запускаете другую команду.

Теперь, когда вы знаете разницу между процессом и потоком, вы будете работать со скрытыми потоками Node.js в следующем разделе.

Понимание скрытых потоков в Node.js

Node.js предоставляет дополнительные потоки, поэтому он считается многопоточным. В этом разделе вы изучите скрытые потоки в Node.js, которые помогают сделать операции ввода-вывода неблокируемыми.

Как упоминалось во введении, JavaScript является однопоточным, и весь код JavaScript выполняется в одном потоке. Это включает в себя исходный код вашей программы и сторонние библиотеки, которые вы включаете в свою программу. Когда программа выполняет операцию ввода-вывода для чтения файла или сетевого запроса, это блокирует основной поток.

Однако Node.js реализует библиотеку libuv, которая обеспечивает четыре дополнительных потока для процесса Node.js. В этих потоках операции ввода-вывода обрабатываются отдельно, и когда они завершаются, цикл событий добавляет обратный вызов, связанный с задачей ввода-вывода, в очередь микрозадач. Когда стек вызовов в основном потоке очищается, обратный вызов помещается в стек вызовов, а затем выполняется. Чтобы было понятно, обратный вызов, связанный с данной задачей ввода-вывода, не выполняется параллельно; однако сама задача чтения файла или сетевого запроса выполняется параллельно с помощью потоков. После завершения задачи ввода-вывода обратный вызов запускается в основном потоке.

В дополнение к этим четырем потокам движок V8 также предоставляет два потока для обработки таких вещей, как автоматическая сборка мусора. Таким образом, общее количество потоков в процессе достигает семи: один основной поток, четыре потока Node.js и два потока V8.

Чтобы убедиться, что каждый процесс Node.js имеет семь потоков, снова запустите файл process.js и переведите его в фоновый режим:

  1. node process.js A &

Терминал будет регистрировать идентификатор процесса, а также вывод программы:

Output
[1] 9933 A: 2000 A: 4000

Запишите где-нибудь идентификатор процесса и нажмите ENTER, чтобы снова использовать подсказку.

Чтобы увидеть потоки, запустите команду top и передайте ей идентификатор процесса, отображаемый в выводе:

  1. top -H -p 9933

-H указывает top отображать потоки в процессе. Флаг -p указывает top отслеживать только активность в данном идентификаторе процесса.

Когда вы запустите команду, ваш вывод будет выглядеть примерно так:

Output
top - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26 Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie %Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node 9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node 9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node 9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node 9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node 9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node

Как видно из выходных данных, в процессе Node.js всего семь потоков: один основной поток для выполнения JavaScript, четыре потока Node.js и два потока V8.

Как обсуждалось ранее, четыре потока Node.js используются для операций ввода-вывода, чтобы сделать их неблокирующими. Они хорошо подходят для этой задачи, а самостоятельное создание потоков для операций ввода-вывода может даже ухудшить производительность вашего приложения. Этого нельзя сказать о задачах, связанных с процессором. Задача, связанная с ЦП, не использует никаких дополнительных потоков, доступных в процессе, и блокирует основной поток.

Теперь нажмите q, чтобы выйти из top и остановить процесс узла с помощью следующей команды:

  1. kill -9 9933

Теперь, когда вы знаете о потоках в процессе Node.js, в следующем разделе вы напишете задачу, привязанную к процессору, и посмотрите, как она влияет на основной поток.

Создание задачи, привязанной к процессору, без рабочих потоков

В этом разделе вы создадите приложение Express с неблокирующим маршрутом и блокирующим маршрутом, выполняющим задачу, связанную с процессором.

Сначала откройте index.js в предпочитаемом вами редакторе:

  1. nano index.js

В файле index.js добавьте следующий код для создания базового сервера:

const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

В следующем блоке кода вы создаете HTTP-сервер с помощью Express. В первой строке вы импортируете модуль express. Затем вы устанавливаете переменную app для хранения экземпляра Express. После этого вы определяете переменную port, которая содержит номер порта, который сервер должен прослушивать.

После этого вы используете app.get(/non-blocking), чтобы определить маршрут, по которому должны отправляться запросы GET. Наконец, вы вызываете метод app.listen(), чтобы указать серверу начать прослушивание порта 3000.

Затем определите другой маршрут, /blocking/, который будет содержать задачу с интенсивным использованием ЦП:

...
app.get("/blocking", async (req, res) => {
  let counter = 0;
  for (let i = 0; i < 20_000_000_000; i++) {
    counter++;
  }
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

Вы определяете маршрут /blocking с помощью app.get(/blocking), который принимает асинхронный обратный вызов с префиксом ключевого слова async в качестве второй аргумент, запускающий задачу с интенсивным использованием ЦП. Внутри обратного вызова вы создаете цикл for, который повторяется 20 миллиардов раз, и во время каждой итерации он увеличивает переменную counter на 1. Эта задача выполняется на ЦП, и ее выполнение займет пару секунд.

На этом этапе ваш файл index.js будет выглядеть следующим образом:

const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

app.get("/blocking", async (req, res) => {
  let counter = 0;
  for (let i = 0; i < 20_000_000_000; i++) {
    counter++;
  }
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

Сохраните и выйдите из файла, затем запустите сервер с помощью следующей команды:

  1. node index.js

Когда вы запустите команду, вы увидите вывод, подобный следующему:

Output
App listening on port 3000

Это показывает, что сервер запущен и готов к работе.

Теперь зайдите на http://localhost:3000/non-blocking в предпочитаемом вами браузере. Вы увидите мгновенный ответ с сообщением Эта страница не блокируется.

Примечание. Если вы следуете руководству на удаленном сервере, вы можете использовать переадресацию портов для тестирования приложения в браузере.

Пока сервер Express все еще работает, откройте другой терминал на локальном компьютере и введите следующую команду:

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

После подключения к серверу перейдите по адресу http://localhost:3000/non-blocking в веб-браузере вашего локального компьютера. Держите второй терминал открытым на протяжении оставшейся части этого руководства.

Затем откройте новую вкладку и перейдите на http://localhost:3000/blocking. Когда страница загрузится, быстро откройте еще две вкладки и снова посетите http://localhost:3000/non-blocking. Вы увидите, что не получите мгновенного ответа, и страницы будут пытаться загрузиться. Только после того, как маршрут /blocking завершит загрузку и вернет ответ result is 20000000000, остальные маршруты вернут ответ.

Причина, по которой все маршруты /non-blocking не работают при загрузке маршрута /blocking, связана с циклом for, привязанным к процессору. , который блокирует основной поток. Когда основной поток заблокирован, Node.js не может обслуживать какие-либо запросы, пока не завершится задача, привязанная к ЦП. Таким образом, если ваше приложение имеет тысячи одновременных запросов GET к маршруту /non-blocking, достаточно одного посещения маршрута /blocking. требует, чтобы все маршруты приложений не отвечали.

Как видите, блокировка основного потока может повредить работе пользователя с вашим приложением. Чтобы решить эту проблему, вам потребуется разгрузить задачу, связанную с процессором, в другой поток, чтобы основной поток мог продолжать обрабатывать другие HTTP-запросы.

При этом остановите сервер, нажав CTRL+C. Вы снова запустите сервер в следующем разделе после внесения дополнительных изменений в файл index.js. Причина, по которой сервер останавливается, заключается в том, что Node.js не обновляется автоматически при внесении новых изменений в файл.

Теперь, когда вы понимаете, какое негативное влияние может оказать задача, интенсивно использующая ЦП, на ваше приложение, вы попытаетесь избежать блокировки основного потока с помощью промисов.

Разгрузка задачи, привязанной к процессору, с помощью промисов

Часто, когда разработчики узнают об эффекте блокировки от задач, связанных с процессором, они обращаются к обещаниям сделать код неблокирующим. Этот инстинкт проистекает из знания об использовании неблокирующих методов ввода-вывода на основе обещаний, таких как readFile() и writeFile(). Но, как вы уже знаете, операции ввода-вывода используют скрытые потоки Node.js, в отличие от задач, привязанных к процессору. Тем не менее, в этом разделе вы завернете задачу, привязанную к процессору, в промис, пытаясь сделать ее неблокирующей. Это не сработает, но поможет вам увидеть ценность использования рабочих потоков, что вы сделаете в следующем разделе.

Снова откройте файл index.js в своем редакторе:

  1. nano index.js

В файле index.js удалите выделенный код, содержащий задачу с интенсивным использованием ЦП:

...
app.get("/blocking", async (req, res) => {
  let counter = 0;
  for (let i = 0; i < 20_000_000_000; i++) {
    counter++;
  }
  res.status(200).send(`result is ${counter}`);
});
...

Затем добавьте следующий выделенный код, содержащий функцию, которая возвращает обещание:

...
function calculateCount() {
  return new Promise((resolve, reject) => {
    let counter = 0;
    for (let i = 0; i < 20_000_000_000; i++) {
      counter++;
    }
    resolve(counter);
  });
}

app.get("/blocking", async (req, res) => {
  res.status(200).send(`result is ${counter}`);
}

Функция calculateCount() теперь содержит вычисления, которые вы выполняли в функции-обработчике /blocking. Функция возвращает обещание, которое инициализируется синтаксисом new Promise. Обещание принимает обратный вызов с параметрами resolve и reject, которые обрабатывают успех или неудачу. Когда цикл for завершает работу, промис разрешается со значением в переменной counter.

Затем вызовите функцию calculateCount() в функции-обработчике /blocking/ в файле index.js:

app.get("/blocking", async (req, res) => {
  const counter = await calculateCount();
  res.status(200).send(`result is ${counter}`);
});

Здесь вы вызываете функцию calculateCount() с префиксом ключевого слова await, чтобы дождаться разрешения промиса. Как только промис разрешается, переменной counter присваивается разрешенное значение.

Ваш полный код теперь будет выглядеть следующим образом:

const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

function calculateCount() {
  return new Promise((resolve, reject) => {
    let counter = 0;
    for (let i = 0; i < 20_000_000_000; i++) {
      counter++;
    }
    resolve(counter);
  });
}

app.get("/blocking", async (req, res) => {
  const counter = await calculateCount();
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

Сохраните и выйдите из файла, затем снова запустите сервер:

  1. node index.js

В веб-браузере откройте http://localhost:3000/blocking и, когда он загрузится, быстро перезагрузите вкладки http://localhost:3000/non-blocking. Как вы заметили, маршруты non-blocking по-прежнему затронуты, и все они будут ждать завершения загрузки маршрута /blocking. Поскольку маршруты по-прежнему затрагиваются, промисы не заставляют код JavaScript выполняться параллельно и не могут использоваться для того, чтобы сделать задачи, связанные с процессором, неблокирующими.

При этом остановите сервер приложений с помощью CTRL+C.

Теперь, когда вы знаете, что промисы не предоставляют никакого механизма, позволяющего сделать задачи, связанные с процессором, неблокирующими, вы будете использовать модуль Node.js worker-threads, чтобы выгрузить задачу, связанную с процессором, в отдельный поток.

Разгрузка задачи, связанной с ЦП, с помощью модуля рабочих потоков

В этом разделе вы разгрузите задачу, интенсивно использующую ЦП, в другой поток, используя модуль worker-threads, чтобы избежать блокировки основного потока. Для этого вы создадите файл worker.js, который будет содержать задачу, интенсивно использующую ЦП. В файле index.js вы будете использовать модуль worker-threads для инициализации потока и запуска задачи в файле worker.js. выполняться параллельно основному потоку. После завершения задачи рабочий поток отправит сообщение с результатом обратно в основной поток.

Для начала убедитесь, что у вас есть 2 или более ядер, используя команду nproc:

  1. nproc
Output
4

Если он показывает два или более ядер, вы можете продолжить этот шаг.

Затем создайте и откройте файл worker.js в текстовом редакторе:

  1. nano worker.js

В файл worker.js добавьте следующий код, чтобы импортировать модуль worker-threads и выполнить задачу, интенсивно использующую ЦП:

const { parentPort } = require("worker_threads");

let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
  counter++;
}

Первая строка загружает модуль worker_threads и извлекает класс parentPort. Класс предоставляет методы, которые вы можете использовать для отправки сообщений в основной поток. Затем у вас есть задача с интенсивным использованием ЦП, которая в настоящее время находится в функции calculateCount() в файле index.js. Позже на этом шаге вы удалите эту функцию из index.js.

После этого добавьте выделенный код ниже:

const { parentPort } = require("worker_threads");

let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
  counter++;
}

parentPort.postMessage(counter);

Здесь вы вызываете метод postMessage() класса parentPort, который отправляет в основной поток сообщение, содержащее результат задачи, привязанной к ЦП, хранящейся в переменная счетчика.

Сохраните и выйдите из файла. Откройте index.js в текстовом редакторе:

  1. nano index.js

Поскольку у вас уже есть задача с привязкой к ЦП в worker.js, удалите выделенный код из index.js:

const express = require("express");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

function calculateCount() {
  return new Promise((resolve, reject) => {
    let counter = 0;
    for (let i = 0; i < 20_000_000_000; i++) {
      counter++;
    }
    resolve(counter);
  });
}

app.get("/blocking", async (req, res) => {
  const counter = await calculateCount();
  res.status(200).send(`result is ${counter}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

Затем в обратном вызове app.get(/blocking) добавьте следующий код для инициализации потока:

const express = require("express");
const { Worker } = require("worker_threads");
...
app.get("/blocking", async (req, res) => {
  const worker = new Worker("./worker.js");
  worker.on("message", (data) => {
    res.status(200).send(`result is ${data}`);
  });
  worker.on("error", (msg) => {
    res.status(404).send(`An error occurred: ${msg}`);
  });
});
...

Сначала вы импортируете модуль worker_threads и распаковываете класс Worker. В обратном вызове app.get(/blocking) вы создаете экземпляр Worker, используя ключевое слово new, за которым следует вызов Worker с путем к файлу worker.js в качестве аргумента. Это создает новый поток, и код в файле worker.js начинает выполняться в потоке на другом ядре.

После этого вы прикрепляете событие к экземпляру worker, используя метод on(message) для прослушивания события сообщения. Когда получено сообщение, содержащее результат из файла worker.js, оно передается в качестве параметра обратного вызова метода, который возвращает пользователю ответ, содержащий результат задачи, связанной с процессором.

Затем вы прикрепляете другое событие к рабочему экземпляру, используя метод on(error) для прослушивания события ошибки. Если возникает ошибка, обратный вызов возвращает пользователю ответ 404, содержащий сообщение об ошибке.

Ваш полный файл теперь будет выглядеть следующим образом:

const express = require("express");
const { Worker } = require("worker_threads");

const app = express();
const port = process.env.PORT || 3000;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

app.get("/blocking", async (req, res) => {
  const worker = new Worker("./worker.js");
  worker.on("message", (data) => {
    res.status(200).send(`result is ${data}`);
  });
  worker.on("error", (msg) => {
    res.status(404).send(`An error occurred: ${msg}`);
  });
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

Сохраните и выйдите из файла, затем запустите сервер:

  1. node index.js

Снова откройте вкладку http://localhost:3000/blocking в веб-браузере. Прежде чем загрузка завершится, обновите все вкладки http://localhost:3000/non-blocking. Теперь вы должны заметить, что они загружаются мгновенно, не дожидаясь завершения загрузки маршрута /blocking. Это связано с тем, что задача, связанная с ЦП, переносится в другой поток, а основной поток обрабатывает все входящие запросы.

Теперь остановите сервер с помощью CTRL+C.

Теперь, когда вы можете сделать задачу, интенсивно использующую ЦП, неблокирующей с помощью рабочего потока, вы будете использовать четыре рабочих потока для повышения производительности задачи, интенсивно использующей ЦП.

Оптимизация задачи с интенсивным использованием ЦП с использованием четырех рабочих потоков

В этом разделе вы разделите задачу, интенсивно использующую ЦП, между четырьмя рабочими потоками, чтобы они могли завершить задачу быстрее и сократить время загрузки маршрута /blocking.

Чтобы над одной и той же задачей работало больше рабочих потоков, вам потребуется разделить задачи. Поскольку задача включает в себя циклы 20 миллиардов раз, вы разделите 20 миллиардов на количество потоков, которые вы хотите использовать. В данном случае это 4. Вычисление 20_000_000_000/4 даст результат 5_000_000_000. Таким образом, каждый поток будет циклически переходить от 0 к 5_000_000_000 и увеличивать counter на 1. Когда каждый поток завершается, он отправляет сообщение в основной поток, содержащее результат. Как только основной поток получит сообщения от всех четырех потоков по отдельности, вы объедините результаты и отправите ответ пользователю.

Вы также можете использовать тот же подход, если у вас есть задача, которая перебирает большие массивы. Например, если вы хотите изменить размер 800 изображений в каталоге, вы можете создать массив, содержащий все пути к файлам изображений. Затем разделите 800 на 4 (количество потоков) и заставьте каждый поток работать в определенном диапазоне. Первый поток изменит размер изображений с индекса массива 0 на 199, второй поток изменит индекс 200 на 399 и скоро.

Во-первых, убедитесь, что у вас есть четыре или более ядер:

  1. nproc
Output
4

Сделайте копию файла worker.js с помощью команды cp:

  1. cp worker.js four_workers.js

Текущие файлы index.js и worker.js останутся нетронутыми, чтобы вы могли запустить их снова, чтобы позже сравнить их производительность с изменениями в этом разделе.

Затем откройте файл four_workers.js в текстовом редакторе:

  1. nano four_workers.js

В файле four_workers.js добавьте выделенный код для импорта объекта workerData:

const { workerData, parentPort } = require("worker_threads");

let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
  counter++;
}

parentPort.postMessage(counter);

Во-первых, вы извлекаете объект WorkerData, который будет содержать данные, переданные из основного потока при его инициализации (что вы скоро сделаете в файле index.js). . Объект имеет свойство thread_count, которое содержит количество потоков, равное 4. Далее в цикле for значение 20_000_000_000 делится на 4, в результате чего получается 5_000_000_000.

Сохраните и закройте файл, затем скопируйте файл index.js:

  1. cp index.js index_four_workers.js

Откройте файл index_four_workers.js в своем редакторе:

  1. nano index_four_workers.js

В файле index_four_workers.js добавьте выделенный код для создания экземпляра потока:

...
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
...
function createWorker() {
  return new Promise(function (resolve, reject) {
    const worker = new Worker("./four_workers.js", {
      workerData: { thread_count: THREAD_COUNT },
    });
  });
}

app.get("/blocking", async (req, res) => {
  ...
})
...

Во-первых, вы определяете константу THREAD_COUNT, содержащую количество потоков, которые вы хотите создать. Позже, когда у вас будет больше ядер на вашем сервере, масштабирование будет включать изменение значения THREAD_COUNT на количество потоков, которые вы хотите использовать.

Затем функция createWorker() создает и возвращает обещание. В обратном вызове обещания вы инициализируете новый поток, передавая классу Worker путь к файлу four_workers.js в качестве первого аргумента. Затем вы передаете объект в качестве второго аргумента. Затем вы назначаете объекту свойство workerData, значением которого является другой объект. Наконец, вы назначаете объекту свойство thread_count, значением которого является количество потоков в константе THREAD_COUNT. Объект workerData — это объект, на который вы ссылались ранее в файле workers.js.

Чтобы убедиться, что промис разрешается или выдает ошибку, добавьте следующие выделенные строки:

...
function createWorker() {
  return new Promise(function (resolve, reject) {
    const worker = new Worker("./four_workers.js", {
      workerData: { thread_count: THREAD_COUNT },
    });
    worker.on("message", (data) => {
      resolve(data);
    });
    worker.on("error", (msg) => {
      reject(`An error ocurred: ${msg}`);
    });
  });
}
...

Когда рабочий поток отправляет сообщение в основной поток, обещание разрешается с возвращенными данными. Однако при возникновении ошибки обещание возвращает сообщение об ошибке.

Теперь, когда вы определили функцию, которая инициализирует новый поток и возвращает данные из потока, вы будете использовать функцию в app.get(/blocking) для создания новых потоков.

Но сначала удалите следующий выделенный код, так как вы уже определили эту функциональность в функции createWorker():

...
app.get("/blocking", async (req, res) => {
  const worker = new Worker("./worker.js");
  worker.on("message", (data) => {
    res.status(200).send(`result is ${data}`);
  });
  worker.on("error", (msg) => {
    res.status(404).send(`An error ocurred: ${msg}`);
  });
});
...

После удаления кода добавьте следующий код для инициализации четырех рабочих потоков:

...
app.get("/blocking", async (req, res) => {
  const workerPromises = [];
  for (let i = 0; i < THREAD_COUNT; i++) {
    workerPromises.push(createWorker());
  }
});
...

Сначала вы создаете переменную workerPromises, содержащую пустой массив. Затем вы выполняете итерацию столько раз, сколько указано в THREAD_COUNT, то есть 4. Во время каждой итерации вы вызываете функцию createWorker() для создания нового потока. Затем вы помещаете объект обещания, который возвращает функция, в массив workerPromises, используя метод JavaScript push. Когда цикл завершится, workerPromises будет иметь четыре объекта обещания, каждый из которых будет возвращен в результате четырехкратного вызова функции createWorker().

Теперь добавьте следующий выделенный ниже код, чтобы дождаться разрешения промисов и вернуть ответ пользователю:

app.get("/blocking", async (req, res) => {
  const workerPromises = [];
  for (let i = 0; i < THREAD_COUNT; i++) {
    workerPromises.push(createWorker());
  }

  const thread_results = await Promise.all(workerPromises);
  const total =
    thread_results[0] +
    thread_results[1] +
    thread_results[2] +
    thread_results[3];
  res.status(200).send(`result is ${total}`);
});

Поскольку массив workerPromises содержит промисы, возвращаемые вызовом createWorker(), вы добавляете префикс Promise.all() к методу await и вызовите метод all() с аргументом workerPromises. Метод Promise.all() ожидает разрешения всех промисов в массиве. Когда это происходит, переменная thread_results содержит значения, разрешенные промисами. Поскольку расчеты были разделены между четырьмя рабочими процессами, вы суммируете их все вместе, получая каждое значение из thread_results, используя синтаксис записи скобок. После добавления вы возвращаете общую стоимость на страницу.

Теперь ваш полный файл должен выглядеть так:

const express = require("express");
const { Worker } = require("worker_threads");

const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;

app.get("/non-blocking/", (req, res) => {
  res.status(200).send("This page is non-blocking");
});

function createWorker() {
  return new Promise(function (resolve, reject) {
    const worker = new Worker("./four_workers.js", {
      workerData: { thread_count: THREAD_COUNT },
    });
    worker.on("message", (data) => {
      resolve(data);
    });
    worker.on("error", (msg) => {
      reject(`An error ocurred: ${msg}`);
    });
  });
}

app.get("/blocking", async (req, res) => {
  const workerPromises = [];
  for (let i = 0; i < THREAD_COUNT; i++) {
    workerPromises.push(createWorker());
  }
  const thread_results = await Promise.all(workerPromises);
  const total =
    thread_results[0] +
    thread_results[1] +
    thread_results[2] +
    thread_results[3];
  res.status(200).send(`result is ${total}`);
});

app.listen(port, () => {
  console.log(`App listening on port ${port}`);
});

Сохраните и закройте файл. Прежде чем запускать этот файл, сначала запустите index.js, чтобы измерить время отклика:

  1. node index.js

Затем откройте новый терминал на локальном компьютере и введите следующую команду curl, которая измеряет, сколько времени требуется для получения ответа от маршрута /blocking:

  1. time curl --get http://localhost:3000/blocking

Команда time измеряет, как долго выполняется команда curl. Команда curl отправляет HTTP-запрос на указанный URL-адрес, а параметр --get указывает curl выполнить GET запрос.

Когда команда запустится, ваш вывод будет выглядеть примерно так:

Output
real 0m28.882s user 0m0.018s sys 0m0.000s

Выделенный вывод показывает, что для получения ответа требуется около 28 секунд, что может отличаться на вашем компьютере.

Затем остановите сервер с помощью CTRL+C и запустите файл index_four_workers.js:

  1. node index_four_workers.js

Снова посетите маршрут /blocking во втором терминале:

  1. time curl --get http://localhost:3000/blocking

Вы увидите вывод, соответствующий следующему:

Output
real 0m8.491s user 0m0.011s sys 0m0.005s

Вывод показывает, что это занимает около 8 секунд, что означает, что вы сокращаете время загрузки примерно на 70%.

Вы успешно оптимизировали задачу, связанную с ЦП, с помощью четырех рабочих потоков. Если у вас есть машина с более чем четырьмя ядрами, обновите THREAD_COUNT до этого числа, и вы еще больше сократите время загрузки.

Заключение

В этой статье вы создали приложение Node с задачей, привязанной к процессору, которая блокирует основной поток. Затем вы попытались сделать задачу неблокирующей с помощью промисов, но безуспешно. После этого вы использовали модуль worker_threads, чтобы разгрузить задачу, привязанную к процессору, в другой поток, чтобы сделать ее неблокирующей. Наконец, вы использовали модуль worker_threads для создания четырех потоков, чтобы ускорить задачу, интенсивно использующую ЦП.

В качестве следующего шага см. Как кодировать в Node.js.