1. ホーム
  2. c#

[解決済み】Parallel.ForEachでawaitをネストする【重複あり

2022-04-18 03:05:07

質問

メトロアプリで、WCFの呼び出しを何度も実行する必要があります。かなりの数の呼び出しを行う必要があるため、並列ループで実行する必要があります。 問題は、WCFコールがすべて完了する前に並列ループが終了してしまうことです。

これを期待通りに動作させるには、どのようにリファクタリングすればよいでしょうか。

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

解決方法は?

の背後にある全体のアイデア Parallel.ForEach() は、スレッドの集合を持ち、各スレッドがコレクションの一部を処理することです。お気づきのように、これは async - await ここで、非同期呼び出しの間、スレッドを解放したい場合。

をブロックすることで、それを「修正」することができます。 ForEach() スレッドの意味がなくなってしまいます。 async - await .

何ができるかというと TPLデータフロー の代わりに Parallel.ForEach() をサポートし、非同期の Task をうまく使っている。

具体的には、あなたのコードを TransformBlock に変換し、それぞれのidを Customer を使用しています。 async ラムダを使用します。このブロックは並列に実行されるように設定することができます。このブロックを ActionBlock を書き込むことで、それぞれの Customer をコンソールに送信します。 ブロックネットワークを設定した後は Post() 各IDを TransformBlock .

コードでは

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

の並列性を制限したいのでしょうが、そのためには TransformBlock をある小さな定数に設定します。また、容量を制限して TransformBlock を使い、非同期にアイテムを追加していきます。 SendAsync() 例えば、コレクションが大きすぎる場合などです。

あなたのコードと比較した場合の追加メリットとして、(うまくいった場合)1つのアイテムが終了するとすぐに書き込みを開始し、すべての処理が終了するまで待つ必要がないことです。