1. ホーム
  2. python

[解決済み] Python asyncioで並行処理を制限する方法とは?

2023-05-15 23:26:31

質問

ダウンロードするリンクの束があり、それぞれのリンクがダウンロードに異なる時間を要すると仮定します。そして、私は最大3つの接続のみを使用してダウンロードすることを許可されています。今、私はこれをasyncioを使用して効率的に行うことを保証したいと思います。

私が達成しようとしていることは次のとおりです。任意の時点で、私は少なくとも3つのダウンロードが実行されていることを確認しようとします。

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

数字はダウンロードリンク、ハイフンはダウンロード待ちを表しています。

今使っているコードは以下の通りです。

from random import randint
import asyncio

count = 0


async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()


async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

そして、出力は期待通りです。

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

しかし、ここで質問です。

  1. 現時点では、ダウンロードが完了するまでメイン関数を実行し続けるために、単に 9 秒間待機しています。最後のダウンロードが完了するのを待ってから main 関数を終了する前に最後のダウンロードが完了するのを待つ効率的な方法はありますか?(私は asyncio.wait があるのは知っていますが、動作させるためにはすべてのタスクの参照を保存する必要があります)

  2. この種のタスクを行う良いライブラリは何ですか?javascriptには多くの非同期ライブラリがあることは知っていますが、Pythonはどうでしょうか?

編集する 2. 一般的な非同期パターンをケアする良いライブラリは何ですか?(例えば 非同期 )

どのように解決するのですか?

この回答の残りの部分を読む前に、asyncioでこの並列タスクの数を制限する慣用的な方法は、次のように使用されていることに注意してください。 asyncio.Semaphore で示されるように ミハイルの答え で、エレガントに抽象化された アンドレイの回答 . この回答には、同じことを達成するための作業、しかし少し複雑な方法が含まれています。この回答を残しているのは、この方法がセマフォよりも利点がある場合があるからです。特に、行うべき作業が非常に大きいか、無制限であり、すべてのコルーチンを事前に作成できない場合です。その場合、2番目の(キューベースの)解決策は、この答えが望むものです。しかし、aiohttpによる並列ダウンロードのような、ほとんどの通常の状況では、代わりにセマフォを使用する必要があります。


基本的には、固定サイズの プール のダウンロードタスクが必要です。 asyncio にはあらかじめタスクプールが用意されていませんが、作るのは簡単です。単にタスクのセットを保持して、それが制限を超えて大きくならないようにするだけです。質問ではそのルートに進むことに消極的であると述べられていますが、コードははるかにエレガントなものに仕上がります。

import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

代替案としては、固定サイズのスレッドプールのように、ダウンロードを行うコルーチンを一定数作成し、そのコルーチンに対して asyncio.Queue . これにより、手動でダウンロードの数を制限する必要がなくなります。 download() :

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

もう一つの質問に関しては、明らかな選択として aiohttp .