[解決済み] マルチプロセシング - パイプとキュー
質問
におけるキューとパイプの基本的な違いは何ですか? Pythonのマルチプロセッシングパッケージ ?
どのような場面でどちらを選ぶべきでしょうか? どのような場合に
Pipe()
? どのような場合に
Queue()
?
解決方法は?
使用するタイミング
2点以上の通信が必要な場合は
Queue()
.
絶対的な性能が必要な場合は
Pipe()
の方がはるかに速いので
Queue()
の上に構築されています。
Pipe()
.
パフォーマンスベンチマーキング
2つのプロセスを起動し、その間にできるだけ早くメッセージを送りたい場合を考えてみましょう。 を使った類似のテスト間のドラッグレースによるタイミング結果です。
Pipe()
と
Queue()
... Ubuntu 11.10 と Python 2.7.2 が動作する ThinkpadT61 上での話です。
参考までに、以下の結果を投じてみました。
JoinableQueue()
をオマケにつけています。
JoinableQueue()
は、以下のような場合にタスクを処理します。
queue.task_done()
が呼び出されると (特定のタスクについて知ることもなく、キューの中の未完成のタスクをカウントするだけです)、そのため
queue.join()
は、作業が終了したことを知ることができます。
それぞれのコードは、この回答の一番下にあります...
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
要約すると
Pipe()
の方が3倍くらい速いです。
Queue()
. のことは考えないでください。
JoinableQueue()
どうしても必要な場合は別ですが。
特典映像2
マルチプロセッシングは、情報の流れに微妙な変化をもたらすので、いくつかの近道を知らない限り、デバッグは困難です。 例えば、辞書のインデックスを作成するスクリプトが、多くの条件下では正常に動作するが、特定の入力ではまれに失敗することがあります。
通常、Pythonのプロセス全体がクラッシュすると、失敗の手がかりが得られます。しかし、マルチプロセシング関数がクラッシュした場合、コンソールに出力される未承諾のクラッシュトレースバックは得られません。 未知のマルチプロセッシングのクラッシュを追跡するのは、何がプロセスをクラッシュさせたのかの手がかりがないと難しいのです。
私が発見したマルチプロセッシングのクラッシュ情報を追跡する最も簡単な方法は、マルチプロセッシング関数全体を
try
/
except
を使用し
traceback.print_exc()
:
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
さて、クラッシュを見つけると次のように表示されます。
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
ソースコードです。
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
関連
-
python call matlab メソッドの詳細
-
[解決済み】ImportError: sklearn.cross_validation という名前のモジュールがない。
-
[解決済み] 'DataFrame' オブジェクトに 'sort' 属性がない
-
[解決済み] callとapplyの違いは何ですか?
-
[解決済み] SQLiteのINSERT/per-secondのパフォーマンスを向上させる
-
[解決済み] 最小限の驚き」と「変更可能なデフォルトの引数
-
[解決済み] リストを均等な大きさの塊に分割するには?
-
[解決済み] Pythonで標準エラー出力するには?
-
[解決済み] クラスター化インデックスと非クラスター化インデックスの実際の意味は何ですか?
-
[解決済み] Pythonの旧スタイルのクラスと新スタイルのクラスの違いは何ですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
Pythonコードの可読性を向上させるツール「pycodestyle」の使い方を詳しく解説します
-
PythonによるExcelファイルの一括操作の説明
-
Python LeNetネットワークの説明とpytorchでの実装
-
Pythonの@decoratorsについてまとめてみました。
-
[解決済み】RuntimeWarning: 割り算で無効な値が発生しました。
-
[解決済み] _tkinter.TclError: 表示名がなく、$DISPLAY環境変数もない。
-
[解決済み】Pythonスクリプトで「Expected 2D array, got 1D array instead: 」というエラーが発生?
-
[解決済み】Pythonでgoogle APIのJSONコードを読み込むとエラーになる件
-
[解決済み】IndexError: invalid index to scalar variableを修正する方法
-
[解決済み】Pythonでマルチプロセシングを使用しているとき、どのようにログを取るべきですか?