[解決済み] マルチプロセッシングによるCelery並列分散タスク
質問
CPUに負荷のかかるCeleryのタスクがあります。多くのEC2インスタンスですべての処理能力(コア)を使って、この仕事を速く終わらせたいと考えています。 (マルチプロセシングによるCelery並列分散タスク) と思います。 ) .
用語は スレッディング , マルチプロセッシング , 分散コンピューティング , 分散型並列処理 などは、私がもっと理解しようと思っている用語です。
タスクの例です。
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
上記のコードを使って (可能であれば例付きで) この1つのタスクがクラウド上の利用可能なすべてのマシン間ですべてのコンピューティングCPUパワーを利用して分割されるようにすることで、Celeryを使用してこのタスクを分散することについて、どのように前になるだろうか?
どのように解決するのですか?
あなたの目標は
- 多くのマシンに作業を分散させる(分散 コンピューティング/分散型並列処理)
- 与えられたマシン上の作業をすべての CPU に分散させる (マルチプロセシング/スレッディング)
Celeryはこれら2つのことをかなり簡単に行うことができます。最初に理解しておくべきことは、それぞれのcelery workerは がデフォルトで設定されていることです。 で設定されており、システムで利用可能な CPU コアの数だけタスクが実行されます。
同時実行性(concurrency)とは、タスクを処理するために使用されるプレフォークワーカープロセスの数です。 タスクの同時処理に使われるプレフォークワーカープロセスの数で、これらすべてが忙しいときに新しい タスクが処理される前に、いずれかのタスクが終了するのを待つ必要があります。 を待つ必要があります。
デフォルトの同時実行数は、そのマシンのCPUの数(コアを含む (コアを含む) ですが、-cオプションで任意の数を指定することができます。 最適な数値はいくつかの要因に依存するため、推奨される値はありません。 しかし、タスクがほとんどI/Oに依存している場合、これを増やすことができます。 CPUの数を2倍以上追加することが実験的に示されています。 2倍以上のCPUを追加してもほとんど効果がなく、かえってパフォーマンスを低下させる可能性があることが実験的に分かっています。 パフォーマンスを低下させる可能性があります。
つまり、個々のタスクは、複数のCPU/コアを使用するためにマルチプロセシング/スレッディングを使用することを心配する必要がないのです。その代わり、celeryは利用可能な各CPUを使用するために十分なタスクを同時に実行します。
これで、次のステップは
list_of_millions_of_ids
. ひとつは、各タスクがひとつのIDを処理するようにし、N個のタスクを実行し、ここで
N == len(list_of_millions_of_ids)
. これは、あるワーカーが早く終わってしまって、ただ待っているだけということがないように、すべてのタスクに均等に仕事が行き渡るように保証するものです。これは (John Doe が言っていたように) celery の
group
.
tasks.py:
@app.task
def process_ids(item):
id = item #long complicated equation here
database.objects(newid=id).save()
そして、タスクを実行するために
from celery import group
from tasks import process_id
jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()
もうひとつの方法は、リストをより小さな断片に分割し、ワーカーに分配することです。この方法では、一部のワーカーが待機している間に他のワーカーが作業を続けることになり、サイクルを無駄にする危険性があります。しかし celery ドキュメントノート には、この懸念はしばしば根拠がないと書かれています。
タスクをチャンクすると並列性が低下するのではと心配する人もいるかもしれません。 しかし、これはビジーなクラスタではほとんど当てはまりません。 メッセージングのオーバーヘッドを避けることができるので、実際には、パフォーマンスをかなり向上させることができます。 パフォーマンスをかなり向上させることができます。
つまり、リストをチャンクして各タスクにチャンクを配布すると、メッセージングのオーバーヘッドが減少するため、パフォーマンスが向上することがわかります。この方法では、各 ID を計算してリストに格納し、一度に 1 つの ID を処理するのではなく、完了したらリスト全体を DB に追加することで、データベースへの負荷を少し軽減することもできます。チャンキングアプローチは次のようなものです。
tasks.py:
@app.task
def process_ids(items):
for item in items:
id = item #long complicated equation here
database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
そして、タスクを開始するために
from tasks import process_ids
jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()
どのようなチャンキングサイズが最良の結果をもたらすか、少し実験することができます。メッセージングのオーバーヘッドを削減しつつ、ワーカーが他のワーカーよりもずっと速くチャンクを終えて、何もすることがなくただ待っているということがないように、サイズを十分に小さくしておくスイートスポットを見つけたいのです。
関連
-
[解決済み] マルチプロセシングpool.mapを複数の引数で使用する方法
-
[解決済み] pipでPythonの全パッケージをアップグレードする方法
-
[解決済み】2つのリストを並列に反復処理する方法は?
-
[解決済み] PythonでファイルのMD5チェックサムを計算するには?重複
-
[解決済み] Pythonの構文に新しいステートメントを追加することはできますか?
-
[解決済み] スペースがないテキストを単語のリストに分割する方法
-
[解決済み] Cythonのコードを含むPythonパッケージはどのように構成すればよいのでしょうか?
-
[解決済み] 異なる順序で同じ要素を持つ2つのJSONオブジェクトを等しく比較するには?
-
[解決済み] matplotlib でプロットの軸、目盛、ラベルの色を変更する方法
-
[解決済み] djangoのQueryDictをPythonのDictに変更するには?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Pythonのマルチプロセッシングプールimap_unorderedの呼び出しの進捗を表示しますか?
-
[解決済み] Pythonの構文に新しいステートメントを追加することはできますか?
-
[解決済み] Django のテストデータベースをメモリ上だけで動作させるには?
-
[解決済み] Pythonのインスタンス変数とクラス変数
-
[解決済み] 文字列のリストを内容に基づいてフィルタリングする
-
[解決済み] CSVデータを処理する際、1行目のデータを無視する方法を教えてください。
-
[解決済み] Pythonの文字列書式をリストで使う
-
[解決済み] あるメソッドが複数の引数のうち1つの引数で呼び出されたことを保証する
-
[解決済み] Pythonの辞書にあるスレッドセーフについて
-
[解決済み] Python の sorted() はどのようなアルゴリズムを使っているのですか?重複