Поиск по сайту:

Как использовать Celery с RabbitMQ для постановки задач в очередь на Ubuntu VPS


Введение

Асинхронная или неблокирующая обработка — это метод отделения выполнения определенных задач от основного потока программы. Это дает вам несколько преимуществ, в том числе позволяет вашему пользовательскому коду работать без перерыва.

Передача сообщений — это метод, который программные компоненты могут использовать для связи и обмена информацией. Он может быть реализован синхронно или асинхронно и может позволить дискретным процессам взаимодействовать без проблем. Передача сообщений часто реализуется как альтернатива традиционным базам данных для этого типа использования, поскольку очереди сообщений часто реализуют дополнительные функции, обеспечивают повышенную производительность и могут полностью находиться в памяти.

Celery — это очередь задач, построенная на асинхронной системе передачи сообщений. Его можно использовать как корзину, в которую можно сбрасывать задачи программирования. Программа, передавшая задачу, может продолжать выполняться и реагировать на запросы, а позже она может опросить сельдерей, чтобы увидеть, завершены ли вычисления, и получить данные.

Хотя сельдерей написан на Python, его протокол можно реализовать на любом языке. Он может даже работать с другими языками через веб-хуки.

Внедрив очередь заданий в среду вашей программы, вы можете легко разгрузить задачи и продолжать обрабатывать взаимодействие со своими пользователями. Это простой способ повысить скорость отклика ваших приложений и избежать зависания при выполнении длительных вычислений.

В этом руководстве мы установим и реализуем очередь заданий celery, используя RabbitMQ в качестве системы обмена сообщениями на Ubuntu 12.04 VPS.

Установите компоненты

Установить сельдерей

Celery написан на Python, поэтому его легко установить так же, как мы работаем с обычными пакетами Python.

Мы будем следовать рекомендуемым процедурам обработки пакетов Python, создав виртуальную среду для установки нашей системы обмена сообщениями. Это помогает нам поддерживать стабильность нашей среды и не влиять на более крупную систему.

Установите пакет виртуальной среды Python из стандартных репозиториев Ubuntu:

sudo apt-get update
sudo apt-get install python-virtualenv

Мы создадим каталог обмена сообщениями, в котором мы будем реализовывать нашу систему:

mkdir ~/messaging
cd ~/messaging

Теперь мы можем создать виртуальную среду, в которой мы можем установить сельдерей, используя следующую команду:

virtualenv --no-site-packages venv

Настроив виртуальную среду, мы можем активировать ее, набрав:

source venv/bin/activate

Ваше приглашение изменится, чтобы отразить, что вы сейчас работаете в виртуальной среде, которую мы создали выше. Это гарантирует, что наши пакеты Python будут установлены локально, а не глобально.

Если в какой-то момент нам понадобится деактивировать среду (не сейчас), вы можете набрать:

deactivate

Теперь, когда мы активировали среду, мы можем установить celery с помощью pip:

pip install celery

Установите RabbitMQ

Celery требует агента обмена сообщениями для обработки запросов из внешнего источника. Этот агент упоминается как «брокер».

Существует довольно много вариантов брокеров, доступных на выбор, включая реляционные базы данных, базы данных NoSQL, хранилища ключей и фактические системы обмена сообщениями.

Мы будем настраивать celery для использования системы обмена сообщениями RabbitMQ, поскольку она обеспечивает надежную, стабильную производительность и хорошо взаимодействует с celery. Это отличное решение, потому что оно включает в себя функции, которые хорошо сочетаются с нашим предполагаемым использованием.

Мы можем установить RabbitMQ через репозитории Ubuntu:

sudo apt-get install rabbitmq-server

Служба RabbitMQ автоматически запускается на нашем сервере при установке.

Создайте экземпляр сельдерея

Чтобы использовать возможности очереди задач celery, нашим первым шагом после установки должно быть создание экземпляра celery. Это простой процесс импорта пакета, создания «приложения», а затем настройки задач, которые сельдерей сможет выполнять в фоновом режиме.

Давайте создадим скрипт Python внутри нашего каталога обмена сообщениями с именем tasks.py, где мы можем определить задачи, которые могут выполнять наши работники.

sudo nano ~/messaging/tasks.py

Первое, что нам нужно сделать, это импортировать функцию Celery из пакета celery:

from celery import Celery

После этого мы можем создать экземпляр приложения celery, который подключается к службе RabbitMQ по умолчанию:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

Первый аргумент функции Celery — это имя, которое будет добавлено к задачам для их идентификации.

Параметр backend — это необязательный параметр, необходимый, если вы хотите запросить состояние фоновой задачи или получить ее результаты.

Если ваши задачи представляют собой просто функции, которые выполняют некоторую работу, а затем завершают работу, не возвращая полезного значения для использования в вашей программе, вы можете не указывать этот параметр. Если эта функция требуется только для некоторых ваших задач, включите ее здесь, и мы можем отключить ее в каждом конкретном случае в дальнейшем.

Параметр broker указывает URL-адрес, необходимый для подключения к нашему брокеру. В нашем случае это сервис RabbitMQ, работающий на нашем сервере. RabbitMQ работает с использованием протокола под названием «amqp». Если RabbitMQ работает в конфигурации по умолчанию, celery не может подключаться ни к какой другой информации, кроме схемы amqp://.

Создание задач Celery

Все еще в этом файле нам теперь нужно добавить наши задачи.

Каждая задача сельдерея должна быть представлена декоратором @app.task. Это позволяет celery идентифицировать функции, к которым он может добавить свои функции очередей. После каждого декоратора мы просто создаем функцию, которую могут запускать наши воркеры.

Нашей первой задачей будет простая функция, которая выводит строку на консоль.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

Поскольку эта функция не возвращает никакой полезной информации (вместо этого она выводит ее на консоль), мы можем указать сельдерею не использовать серверную часть для хранения информации о состоянии этой задачи. Это менее сложно внутри и требует меньше ресурсов.

<пред>

app=Celery («задачи», бэкенд=«amqp», брокер=«amqp: //»)

@app.task(ignore_result=True)

Далее мы добавим еще одну функцию, которая будет генерировать простые числа (взято из RosettaCode). Это может быть длительный процесс, поэтому это хороший пример того, как мы можем работать с асинхронными рабочими процессами, когда мы ждем результата.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

Поскольку нам важно, какое значение возвращает эта функция, и поскольку мы хотим знать, когда она завершится (чтобы мы могли использовать результаты и т. д.), мы не добавляем параметр ignore_result в это второе задание.

Сохраните и закройте файл.

Запуск рабочих процессов Celery

Теперь мы можем запустить рабочие процессы, которые смогут принимать подключения от приложений. Он будет использовать только что созданный файл, чтобы узнать о задачах, которые он может выполнять.

Запустить рабочий экземпляр так же просто, как вызвать имя приложения с помощью команды celery. Мы добавим символ \& в конец нашей строки, чтобы перевести наш рабочий процесс в фоновый режим:

celery worker -A tasks &

Это запустит приложение, а затем отсоединит его от терминала, что позволит вам продолжать использовать его для других задач.

Если вы хотите запустить несколько рабочих процессов, вы можете сделать это, назвав каждого из них аргументом -n:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

%h будет заменен именем хоста, когда будет назван рабочий процесс.

Чтобы остановить рабочих, вы можете использовать команду kill. Мы можем запросить идентификатор процесса, а затем исключить рабочих на основе этой информации.

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

Это позволит рабочему завершить свою текущую задачу перед выходом.

Если вы хотите закрыть всех рабочих, не дожидаясь, пока они выполнят свои задачи, вы можете выполнить:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

Используйте очередь для обработки работы

Мы можем использовать рабочие процессы, которые мы породили, для завершения работы в фоновом режиме для наших программ.

Вместо создания целой программы для демонстрации того, как это работает, мы рассмотрим различные варианты интерпретатора Python:

python

В командной строке мы можем импортировать наши функции в среду:

from tasks import print_hello
from tasks import gen_prime

Если вы протестируете эти функции, вы увидите, что они не имеют какой-либо специальной функциональности. Первая функция печатает строку, как и ожидалось:

print_hello()
hello there

Вторая функция возвращает список простых чисел:

primes = gen_prime(1000)
print primes

Если мы дадим второй функции больший диапазон чисел для проверки, выполнение зависнет, пока оно вычисляет:

primes = gen_prime(50000)

Остановите выполнение, набрав \CTRL-C. Этот процесс явно не работает в фоновом режиме.

Чтобы получить доступ к фоновому рабочему процессу, нам нужно использовать метод .delay. Celery дополняет наши функции дополнительными возможностями. Этот метод используется для передачи функции рабочему процессу для выполнения. Он должен немедленно вернуться:

primes = gen_prime.delay(50000)

Эта задача сейчас выполняется рабочими, которых мы запустили ранее. Поскольку мы настроили параметр backend для нашего приложения, мы можем проверить статус вычисления и получить доступ к результату.

Чтобы проверить, выполнена ли задача, мы можем использовать метод .ready:

primes.ready()
False

Значение «False» означает, что задача все еще выполняется и результат еще не доступен. Когда мы получаем значение «True», мы можем что-то делать с ответом.

primes.ready()
True

Мы можем получить значение с помощью метода .get.

Если мы уже убедились, что значение вычисляется с помощью метода .ready, мы можем использовать этот метод следующим образом:

print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

Однако, если вы не использовали метод .ready до вызова .get, вы, скорее всего, захотите добавить опцию \timeout, чтобы ваша программа не t вынуждены ждать результата, что противоречит цели нашей реализации:

print primes.get(timeout=2)

Это вызовет исключение, если истечет время ожидания, которое вы можете обработать в своей программе.

Заключение

Хотя этой информации достаточно, чтобы начать использовать сельдерей в своих программах, это лишь поверхностное представление о полной функциональности этой библиотеки. Celery позволяет объединять фоновые задачи, группировать задачи и интересным образом комбинировать функции.

Хотя сельдерей написан на Python, его можно использовать с другими языками через веб-хуки. Это делает его невероятно гибким для перемещения задач в фоновый режим, независимо от выбранного вами языка.

Джастин Эллингвуд