1. ホーム
  2. javascript

node.jsにおける並列実行の調整

2023-08-03 02:36:08

質問

node.jsのイベント駆動型プログラミングモデルは、プログラムの流れを調整するのがやや厄介です。

単純な連続実行はネストされたコールバックに変わりますが、これは十分に簡単です(書き出すと少し複雑ですが)。

しかし、並列実行はどうでしょうか?例えば、3つのタスクA,B,Cを並列に実行し、それらが終了したら、その結果をタスクDに送りたい場合を考えてみましょう。

fork/joinモデルでは、これは次のようになります。

  • フォークA
  • フォークB
  • フォークC
  • A,B,Cを結合し、Dを実行

node.jsでどのように書けばいいのでしょうか?ベストプラクティスやクックブックはありますか?私は ソリューションをハンドロールする必要がありますか? または、このためのヘルパーを備えたライブラリがありますか?

どのように解決するのですか?

node.jsはシングルスレッドであるため、本当の意味での並列処理は何もできません。しかし、複数のイベントをスケジュールして、事前に判断できない順序で実行することは可能です。また、データベースアクセスのようなものは、データベースクエリ自体は別のスレッドで実行されますが、完了するとイベントストリームに再統合されるという点で、実際に並列です。

では、複数のイベント ハンドラでコールバックをスケジュールするにはどうすればよいのでしょうか。これは、ブラウザサイドの javascript でアニメーションに使用される一般的なテクニックの 1 つで、完了を追跡するために変数を使用します。

これはハックのように聞こえますし、実際そうです。また、追跡を行うためにたくさんのグローバル変数を残しておくことは、潜在的に面倒に聞こえますし、より少ない言語ではそうでしょう。しかし、javascript ではクロージャを使用することができます。

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);

上記の例では、非同期関数とコールバック関数が引数を必要としないと仮定して、コードをシンプルに保っています。もちろん、非同期関数に引数を渡し、コールバック関数が結果を蓄積してshared_callback関数に渡すようにコードを修正することができます。


追加の回答です。

実は、そのままでも、その fork() 関数はすでにクロージャを使用して非同期関数に引数を渡すことができます。

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

は、A,B,Cの結果を蓄積して、Dに渡すだけです。


さらに追加の回答です。

私は我慢できませんでした。朝食中にこのことを考え続けました。以下は fork() の実装で、結果(通常はコールバック関数への引数として渡される)を蓄積するものです。

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

簡単でしたね。これは fork() はかなり汎用的で、複数の非同質なイベントを同期させるために使うことができます。

Node.jsでの使用例です。

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);


更新情報

このコードはasync.jsや様々なプロミスベースのライブラリが存在する前に書かれたものです。async.jsがこれに触発されたと信じたいのですが、証拠はありません。とにかく...もしあなたが今日これをしようと思っているなら、async.jsやpromiseを見てみてください。ただ、上記の回答はasync.parallelのようなものがどのように動作するかの良い説明/図解であると考えてください。

完全を期すために、次のように async.parallel :

var async = require('async');

async.parallel([A,B,C],D);

なお async.parallel と全く同じ動作をします。 fork 関数と全く同じように動作します。主な違いは、最初の引数としてエラーを D の第一引数にエラーを渡し、第二引数にコールバックを渡していることです。

プロミスを使うと、以下のように書きます。

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);