1. ホーム
  2. multithreading

[解決済み] Pythonでループ内の演算をマルチスレッド化する方法

2022-03-08 17:01:40

質問

非常に大きなリストがあり、次のような操作を行っているとします。

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

私の問題は2つあります。

  • 項目が多い
  • api.my_operationが返るのに時間がかかる

マルチスレッドでapi.my_operationを一度にたくさん回して、5個とか10個とか100個とかのアイテムを一度に処理したいんです。

もしmy_operation()が例外を返したら(その項目はすでに処理してしまったかもしれないので)、それはそれでOKです。それは何も壊さない。ループは次の項目に進むことができます。

備考 Python 2.7.3用です。

解決方法は?

まず、Pythonでは、コードがCPUバウンドしている場合、マルチスレッドは役に立ちません。ですから、スレッドではなく、プロセスを使う必要があります。

IOバウンド、つまりネットワークやディスクのコピーなどを待っているため、「戻るのに時間がかかる」場合はその限りではありません。これについては後で説明します。


次に、5個、10個、100個のアイテムを一度に処理する方法は、5個、10個、100個のワーカーのプールを作成し、ワーカーがサービスするキューにアイテムを入れることです。幸いなことに、stdlib の multiprocessing concurrent.futures ライブラリの両方が、あなたのために詳細のほとんどを包み込んでくれます。

伝統的なプログラミングでは前者の方が強力で柔軟性があり、未来待ちを構成する必要がある場合は後者の方がシンプルになります。(この場合、最もわかりやすい実装は、それぞれ3行で futures で4行 multiprocessing .)

2.6-2.7、3.0-3.1を使っている場合。 futures はビルトインされていません。 PyPI ( pip install futures ).


最後に、ループの繰り返し全体を関数呼び出しに変えることができれば、並列化するのはかなり簡単になります。 map ということで、まずそれをやってみましょう。

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')


まとめること

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)


比較的小さなジョブがたくさんある場合、マルチプロセシングのオーバーヘッドが利益を押し上げる可能性があります。これを解決する方法は、作業をより大きなジョブにバッチアップすることです。例えば、(使用する grouper から itertools レシピ をコピーして自分のコードに貼り付けるか、あるいは more-itertools プロジェクトに参加しています)。

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)


最後に、あなたのコードがIOバインドされている場合はどうでしょうか?その場合、スレッドはプロセスと同じように優れており、オーバーヘッドが少なくなります(制限も少なくなりますが、このような場合、制限の影響は通常ありません)。この「オーバーヘッドが少ない」というのは、スレッドではバッチ処理が必要ないけれども、プロセスでは必要である、ということを意味します。

では、プロセスの代わりにスレッドを使うにはどうしたらいいのでしょうか。それは、次のように変更するだけです。 ProcessPoolExecutorThreadPoolExecutor .

もしあなたのコードがCPUバウンドなのかIOバウンドなのかわからない場合は、両方の方法で試してみてください。


<ブロッククオート

Pythonスクリプト内の複数の関数に対してこの操作を行うことはできますか?例えば、コード内の別の場所に並列化したいforループがあった場合。同じスクリプトで2つのマルチスレッド関数を実行することは可能でしょうか?

実は、2種類の方法があるんです。

まず、同じ(スレッドやプロセスの)エグゼキュータを共有し、複数の場所から利用しても問題ありません。タスクやフューチャーのポイントは、自己完結していることです。どこで実行されるかは気にせず、キューに入れ、最終的に答えが返ってくればいいのです。

また、同じプログラム内に2つのエグゼキュータを用意しても問題ありません。この場合、パフォーマンスが犠牲になります。両方のエグゼキュータを同時に使用すると、8コアで16ビジー・スレッドを実行することになり、コンテキスト・スイッチングが発生します。しかし、2つのエグゼキュータが同時にビジー状態になることはほとんどなく、コードが非常にシンプルになるため、これを行う価値がある場合もあります。あるいは、片方のエグゼキュータが非常に大きなタスクを実行していて完了までに時間がかかり、もう片方が非常に小さなタスクを実行しており、できるだけ早く完了する必要があるような場合です。

もし、あなたのプログラムにどれが適切かわからない場合は、通常、最初の方です。