1. ホーム
  2. python

[解決済み] マルチプロセッシングによるCelery並列分散タスク

2023-06-18 03:53:26

質問

CPUに負荷のかかるCeleryのタスクがあります。多くのEC2インスタンスですべての処理能力(コア)を使って、この仕事を速く終わらせたいと考えています。 (マルチプロセシングによるCelery並列分散タスク) と思います。 ) .

用語は スレッディング , マルチプロセッシング , 分散コンピューティング , 分散型並列処理 などは、私がもっと理解しようと思っている用語です。

タスクの例です。

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

上記のコードを使って (可能であれば例付きで) この1つのタスクがクラウド上の利用可能なすべてのマシン間ですべてのコンピューティングCPUパワーを利用して分割されるようにすることで、Celeryを使用してこのタスクを分散することについて、どのように前になるだろうか?

どのように解決するのですか?

あなたの目標は

  1. 多くのマシンに作業を分散させる(分散 コンピューティング/分散型並列処理)
  2. 与えられたマシン上の作業をすべての CPU に分散させる (マルチプロセシング/スレッディング)

Celeryはこれら2つのことをかなり簡単に行うことができます。最初に理解しておくべきことは、それぞれのcelery workerは がデフォルトで設定されていることです。 で設定されており、システムで利用可能な CPU コアの数だけタスクが実行されます。

同時実行性(concurrency)とは、タスクを処理するために使用されるプレフォークワーカープロセスの数です。 タスクの同時処理に使われるプレフォークワーカープロセスの数で、これらすべてが忙しいときに新しい タスクが処理される前に、いずれかのタスクが終了するのを待つ必要があります。 を待つ必要があります。

デフォルトの同時実行数は、そのマシンのCPUの数(コアを含む (コアを含む) ですが、-cオプションで任意の数を指定することができます。 最適な数値はいくつかの要因に依存するため、推奨される値はありません。 しかし、タスクがほとんどI/Oに依存している場合、これを増やすことができます。 CPUの数を2倍以上追加することが実験的に示されています。 2倍以上のCPUを追加してもほとんど効果がなく、かえってパフォーマンスを低下させる可能性があることが実験的に分かっています。 パフォーマンスを低下させる可能性があります。

つまり、個々のタスクは、複数のCPU/コアを使用するためにマルチプロセシング/スレッディングを使用することを心配する必要がないのです。その代わり、celeryは利用可能な各CPUを使用するために十分なタスクを同時に実行します。

これで、次のステップは list_of_millions_of_ids . ひとつは、各タスクがひとつのIDを処理するようにし、N個のタスクを実行し、ここで N == len(list_of_millions_of_ids) . これは、あるワーカーが早く終わってしまって、ただ待っているだけということがないように、すべてのタスクに均等に仕事が行き渡るように保証するものです。これは (John Doe が言っていたように) celery の group .

tasks.py:

@app.task
def process_ids(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

そして、タスクを実行するために

from celery import group
from tasks import process_id

jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

もうひとつの方法は、リストをより小さな断片に分割し、ワーカーに分配することです。この方法では、一部のワーカーが待機している間に他のワーカーが作業を続けることになり、サイクルを無駄にする危険性があります。しかし celery ドキュメントノート には、この懸念はしばしば根拠がないと書かれています。

タスクをチャンクすると並列性が低下するのではと心配する人もいるかもしれません。 しかし、これはビジーなクラスタではほとんど当てはまりません。 メッセージングのオーバーヘッドを避けることができるので、実際には、パフォーマンスをかなり向上させることができます。 パフォーマンスをかなり向上させることができます。

つまり、リストをチャンクして各タスクにチャンクを配布すると、メッセージングのオーバーヘッドが減少するため、パフォーマンスが向上することがわかります。この方法では、各 ID を計算してリストに格納し、一度に 1 つの ID を処理するのではなく、完了したらリスト全体を DB に追加することで、データベースへの負荷を少し軽減することもできます。チャンキングアプローチは次のようなものです。

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

そして、タスクを開始するために

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

どのようなチャンキングサイズが最良の結果をもたらすか、少し実験することができます。メッセージングのオーバーヘッドを削減しつつ、ワーカーが他のワーカーよりもずっと速くチャンクを終えて、何もすることがなくただ待っているということがないように、サイズを十分に小さくしておくスイートスポットを見つけたいのです。