Поиск по сайту:

Пример многопроцессорной обработки Python


В нашем предыдущем уроке мы узнали о примере Python CSV. В этом уроке мы собираемся изучить многопроцессорность Python на примерах.

Многопроцессорность Python

В настоящее время параллельной обработке уделяется все больше внимания. Если вы до сих пор не знаете о параллельной обработке, узнайте из википедии. По мере того, как производители процессоров начинают добавлять в свои процессоры все больше и больше ядер, создание параллельного кода — отличный способ повысить производительность. Python представил модуль многопроцессорности, позволяющий нам писать параллельный код. Чтобы понять основную мотивацию этого модуля, мы должны знать некоторые основы параллельного программирования. Мы надеемся, что прочитав эту статью, вы сможете получить некоторые знания по этой теме.

Многопроцессорный процесс Python, очередь и блокировки

В многопроцессорном модуле python есть множество классов для создания параллельной программы. Среди них есть три основных класса: Process, Queue и Lock. Эти классы помогут вам построить параллельную программу. Но прежде чем рассказывать о них, давайте начнем эту тему с простого кода. Чтобы сделать параллельную программу полезной, вы должны знать, сколько ядер в вашем компьютере. Модуль Python Multiprocessing позволяет вам это знать. Следующий простой код напечатает количество ядер на вашем компьютере.

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

Класс многопроцессорного процесса Python

Многопроцессорный класс Python Process — это абстракция, которая настраивает другой процесс Python, предоставляет ему возможность запускать код и позволяет родительскому приложению управлять выполнением. Есть две важные функции, принадлежащие классу Process — функция start() и join(). Сначала нам нужно написать функцию, которую будет запускать процесс. Затем нам нужно создать экземпляр объекта процесса. Если мы создадим объект процесса, ничего не произойдет, пока мы не скажем ему начать обработку с помощью функции start(). Затем процесс запустится и вернет результат. После этого мы сообщаем процессу о завершении с помощью функции join(). Без вызова функции join() процесс будет простаивать и не завершится. Поэтому, если вы создадите много процессов и не завершите их, вы можете столкнуться с нехваткой ресурсов. Тогда вам может понадобиться убить их вручную. Одна важная вещь: если вы хотите передать какой-либо аргумент через процесс, вам нужно использовать аргумент ключевого слова args. Следующий код будет полезен для понимания использования класса Process.

from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # instantiating process with arguments
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # complete the processes
    for proc in procs:
        proc.join()

Класс очереди многопроцессорной обработки Python

У вас есть базовые знания о компьютерной структуре данных, вы, вероятно, знаете об очереди. Модули многопроцессорной обработки Python предоставляют класс Queue, который точно соответствует структуре данных First-In-First-Out. Они могут хранить любой объект pickle Python (хотя простые лучше всего) и чрезвычайно полезны для обмена данными между процессами. Очереди особенно полезны, когда они передаются в качестве параметра целевой функции процесса, чтобы позволить процессу потреблять данные. С помощью функции put() мы можем вставлять данные в очередь, а с помощью get() мы можем получать элементы из очередей. См. следующий код для быстрого примера.

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# instantiating a queue object
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Класс блокировки многопроцессорной обработки Python

Задача класса Lock довольно проста. Это позволяет коду запрашивать блокировку, чтобы ни один другой процесс не мог выполнить аналогичный код, пока блокировка не будет снята. Таким образом, задача класса Lock в основном состоит из двух. Один - запросить блокировку, а другой - снять блокировку. Для запроса блокировки используется функция acquire(), а для снятия блокировки используется функция release().

Пример многопроцессорной обработки Python

В этом примере многопроцессорности Python мы объединим все наши знания. Допустим, у нас есть задачи, которые нужно выполнить. Чтобы выполнить эту задачу, мы будем использовать несколько процессов. Итак, мы будем поддерживать две очереди. Один будет содержать задачи, а другой будет содержать журнал выполненных задач. Затем мы создаем процессы для выполнения задачи. Обратите внимание, что класс python Queue уже синхронизирован. Это означает, что нам не нужно использовать класс Lock для блокировки доступа нескольких процессов к одному и тому же объекту очереди. Поэтому в данном случае нам не нужно использовать класс Lock. Ниже приведена реализация, в которой мы добавляем задачи в очередь, затем создаем процессы и запускаем их, а затем используем join() для завершения процессов. Наконец, мы печатаем журнал из второй очереди.

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # creating processes
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # completing process
    for p in processes:
        p.join()

    # print the output
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

Многопроцессорный пул Python

Многопроцессорный пул Python можно использовать для параллельного выполнения функции с несколькими входными значениями, распределяя входные данные между процессами (параллелизм данных). Ниже приведен простой пример многопроцессорного пула Python.

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


def pool_handler():
    p = Pool(2)
    p.map(work_log, work)


if __name__ == '__main__':
    pool_handler()