[解決済み] Python asyncioで並行処理を制限する方法とは?
質問
ダウンロードするリンクの束があり、それぞれのリンクがダウンロードに異なる時間を要すると仮定します。そして、私は最大3つの接続のみを使用してダウンロードすることを許可されています。今、私はこれをasyncioを使用して効率的に行うことを保証したいと思います。
私が達成しようとしていることは次のとおりです。任意の時点で、私は少なくとも3つのダウンロードが実行されていることを確認しようとします。
Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
数字はダウンロードリンク、ハイフンはダウンロード待ちを表しています。
今使っているコードは以下の通りです。
from random import randint
import asyncio
count = 0
async def download(code, permit_download, no_concurrent, downloading_event):
global count
downloading_event.set()
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
count -= 1
if count < no_concurrent and not permit_download.is_set():
permit_download.set()
async def main(loop):
global count
permit_download = asyncio.Event()
permit_download.set()
downloading_event = asyncio.Event()
no_concurrent = 3
i = 0
while i < 9:
if permit_download.is_set():
count += 1
if count >= no_concurrent:
permit_download.clear()
loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
await downloading_event.wait() # To force context to switch to download function
downloading_event.clear()
i += 1
else:
await permit_download.wait()
await asyncio.sleep(9)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
そして、出力は期待通りです。
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
しかし、ここで質問です。
-
現時点では、ダウンロードが完了するまでメイン関数を実行し続けるために、単に 9 秒間待機しています。最後のダウンロードが完了するのを待ってから
main
関数を終了する前に最後のダウンロードが完了するのを待つ効率的な方法はありますか?(私はasyncio.wait
があるのは知っていますが、動作させるためにはすべてのタスクの参照を保存する必要があります) -
この種のタスクを行う良いライブラリは何ですか?javascriptには多くの非同期ライブラリがあることは知っていますが、Pythonはどうでしょうか?
編集する 2. 一般的な非同期パターンをケアする良いライブラリは何ですか?(例えば 非同期 )
どのように解決するのですか?
この回答の残りの部分を読む前に、asyncioでこの並列タスクの数を制限する慣用的な方法は、次のように使用されていることに注意してください。
asyncio.Semaphore
で示されるように
ミハイルの答え
で、エレガントに抽象化された
アンドレイの回答
. この回答には、同じことを達成するための作業、しかし少し複雑な方法が含まれています。この回答を残しているのは、この方法がセマフォよりも利点がある場合があるからです。特に、行うべき作業が非常に大きいか、無制限であり、すべてのコルーチンを事前に作成できない場合です。その場合、2番目の(キューベースの)解決策は、この答えが望むものです。しかし、aiohttpによる並列ダウンロードのような、ほとんどの通常の状況では、代わりにセマフォを使用する必要があります。
基本的には、固定サイズの
プール
のダウンロードタスクが必要です。
asyncio
にはあらかじめタスクプールが用意されていませんが、作るのは簡単です。単にタスクのセットを保持して、それが制限を超えて大きくならないようにするだけです。質問ではそのルートに進むことに消極的であると述べられていますが、コードははるかにエレガントなものに仕上がります。
import asyncio, random
async def download(code):
wait_time = random.randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
async def main(loop):
no_concurrent = 3
dltasks = set()
i = 0
while i < 9:
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(loop.create_task(download(i)))
i += 1
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
代替案としては、固定サイズのスレッドプールのように、ダウンロードを行うコルーチンを一定数作成し、そのコルーチンに対して
asyncio.Queue
. これにより、手動でダウンロードの数を制限する必要がなくなります。
download()
:
# download() defined as above
async def download_worker(q):
while True:
code = await q.get()
await download(code)
q.task_done()
async def main(loop):
q = asyncio.Queue()
workers = [loop.create_task(download_worker(q)) for _ in range(3)]
i = 0
while i < 9:
await q.put(i)
i += 1
await q.join() # wait for all tasks to be processed
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
もう一つの質問に関しては、明らかな選択として
aiohttp
.
関連
-
[解決済み] Pythonで現在時刻を取得する方法
-
[解決済み] pipでPythonの全パッケージをアップグレードする方法
-
[解決済み】ネストされたディレクトリを安全に作成するには?
-
[解決済み】Pythonに三項条件演算子はありますか?
-
[解決済み】2つの辞書を1つの式でマージする(辞書の和をとる)には?)
-
[解決済み] PILからopenCVフォーマットへの変換
-
[解決済み] Python 2.7サポート終了?
-
[解決済み] Pythonでマルチプロセッシングキューを使うには?
-
[解決済み] CSVデータを処理する際、1行目のデータを無視する方法を教えてください。
-
[解決済み] PySparkでデータフレームのカラムをString型からDouble型に変更する方法は?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Pythonの構文に新しいステートメントを追加することはできますか?
-
[解決済み] PythonでSVGからPNGに変換する
-
[解決済み] バブルソートの宿題
-
[解決済み] 文字列のリストを内容に基づいてフィルタリングする
-
[解決済み] 範囲指定された浮動小数点数のランダムな配列を生成します。
-
[解決済み] Jupyter (IPython)ノートブックのセッションをpickleして保存する方法
-
[解決済み] 異なる順序で同じ要素を持つ2つのJSONオブジェクトを等しく比較するには?
-
[解決済み] Python Empty Generator 関数
-
[解決済み] Pythonで、ウェブサイトが404か200かを確認するためにurllibをどのように使用しますか?
-
[解決済み] Pythonの辞書にあるスレッドセーフについて