1. ホーム
  2. python

[解決済み] マルチプロセシング - パイプとキュー

2022-04-20 12:33:04

質問

におけるキューとパイプの基本的な違いは何ですか? Pythonのマルチプロセッシングパッケージ ?

どのような場面でどちらを選ぶべきでしょうか? どのような場合に Pipe() ? どのような場合に Queue() ?

解決方法は?

  • A Pipe() は2つの終点しか持てない。

  • A Queue() は、複数の生産者と消費者を持つことができます。

使用するタイミング

2点以上の通信が必要な場合は Queue() .

絶対的な性能が必要な場合は Pipe() の方がはるかに速いので Queue() の上に構築されています。 Pipe() .

パフォーマンスベンチマーキング

2つのプロセスを起動し、その間にできるだけ早くメッセージを送りたい場合を考えてみましょう。 を使った類似のテスト間のドラッグレースによるタイミング結果です。 Pipe()Queue() ... Ubuntu 11.10 と Python 2.7.2 が動作する ThinkpadT61 上での話です。

参考までに、以下の結果を投じてみました。 JoinableQueue() をオマケにつけています。 JoinableQueue() は、以下のような場合にタスクを処理します。 queue.task_done() が呼び出されると (特定のタスクについて知ることもなく、キューの中の未完成のタスクをカウントするだけです)、そのため queue.join() は、作業が終了したことを知ることができます。

それぞれのコードは、この回答の一番下にあります...

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

要約すると Pipe() の方が3倍くらい速いです。 Queue() . のことは考えないでください。 JoinableQueue() どうしても必要な場合は別ですが。

特典映像2

マルチプロセッシングは、情報の流れに微妙な変化をもたらすので、いくつかの近道を知らない限り、デバッグは困難です。 例えば、辞書のインデックスを作成するスクリプトが、多くの条件下では正常に動作するが、特定の入力ではまれに失敗することがあります。

通常、Pythonのプロセス全体がクラッシュすると、失敗の手がかりが得られます。しかし、マルチプロセシング関数がクラッシュした場合、コンソールに出力される未承諾のクラッシュトレースバックは得られません。 未知のマルチプロセッシングのクラッシュを追跡するのは、何がプロセスをクラッシュさせたのかの手がかりがないと難しいのです。

私が発見したマルチプロセッシングのクラッシュ情報を追跡する最も簡単な方法は、マルチプロセッシング関数全体を try / except を使用し traceback.print_exc() :

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

さて、クラッシュを見つけると次のように表示されます。

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

ソースコードです。


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in xrange(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))


"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))


"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))