1. ホーム
  2. c++

[解決済み] boost::asio::io_service の実行メソッドがブロック/アンブロックされたときの混乱

2023-01-05 15:09:01

質問

Boost.Asioの全くの初心者のため、以下の点に戸惑っています。 io_service::run() . このメソッドがブロック/アンブロックされるタイミングについて、どなたか説明していただけると幸いです。ドキュメントにはこう書かれています。

このメソッドは run() 関数は、すべての作業が終了してディスパッチするハンドラがなくなるまで、あるいは io_service が停止されるまでブロックします。

複数のスレッドが run() 関数を呼び出してスレッドのプールを設定し、そこから io_service がハンドラを実行することができます。プールで待機しているスレッドは全て等価であり io_service はハンドラを起動するためにそれらのうちのどれかを選ぶことができます。

からの通常の終了は run() 関数は io_service オブジェクトが停止していることを意味します ( stopped() 関数は真を返す)。それ以降の run() , run_one() , poll() または poll_one() を呼び出すと、事前に reset() .

次の文は何を意味するのでしょうか?

[...] ディスパッチされるハンドラはもうない [...] 。


の挙動を理解しようとしているうちに io_service::run() の挙動を理解しようとしていたところ、次のようなものに出会いました。 の例です。 を見つけました (例 3a)。 その中で、私は io_service->run() がブロックし、作業指示を待っていることがわかります。

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

しかし、私が作業していた以下のコードでは、クライアントがTCP/IPを使って接続し、データが非同期に受信されるまでrunメソッドがブロックされます。

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

についての説明 run() の挙動を説明するような説明があれば、ありがたいです。

どのように解決するのですか?

基礎知識

簡単な例から始めて、関連するBoost.Asioの部品を調べてみましょう。

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

とは何ですか? ハンドラ ?

A ハンドラ はコールバック以外の何ものでもありません。 例のコードでは、3つのハンドラがあります。

  • print ハンドラ(1)を使用します。
  • handle_async_receive ハンドラ (3)。
  • print ハンドラ (4)。

にもかかわらず、同じ print() 関数が 2 回使用されたとしても、それぞれの使用はそれ自身のユニークに識別可能なハンドラを作成するとみなされます。 ハンドラには様々な形や大きさがあり、上記のような基本的な関数から boost::bind() やラムダから生成されるファンクタのような、 より複雑な構造まで様々です。 複雑さに関係なく、ハンドラは依然としてコールバック以外の何ものでもありません。

とは 仕事 ?

作業とは、Boost.Asioがアプリケーションコードに代わって行うよう要求された処理のことです。 Boost.Asioは作業を指示されるとすぐに作業を開始することもあれば、後日作業を行うために待機することもあります。 作業が完了すると、Boost.Asioはアプリケーションに通知するために、提供された ハンドラ .

Boost.Asioが保証するのは ハンドラ を現在呼び出しているスレッド内でのみ実行されることを保証します。 run() , run_one() , poll() または poll_one() . これらは、作業を行い ハンドラ . したがって、上記の例では print() に投稿されても呼び出されません。 io_service (1). その代わり、それは io_service に追加され、後の時点で呼び出されることになります。 この場合 io_service.run() (5).

非同期操作とは何ですか?

非同期操作 が仕事を作り、Boost.Asioはその仕事のために ハンドラ を呼び出して、作業が完了したときにアプリケーションに通知します。 非同期操作は、接頭辞が async_ . これらの関数は 開始関数 .

非同期操作は、3つのユニークなステップに分解することができます。

  • 関連するアプリケーションを起動する、または通知する io_service に知らせる。 その async_receive の操作で、(3) は io_service に、ソケットから非同期にデータを読み込む必要があることを知らせると async_receive はすぐに返します。
  • 実作業を行う。 この場合、いつ socket がデータを受け取ると、バイトが読み込まれて buffer . 実際の作業は、どちらかで行われます。
    • Boost.Asioがブロックしないと判断できる場合、開始関数(3)。
    • アプリケーションが明示的に実行する場合 io_service (5).
  • を呼び出すと handle_async_receive リードハンドラ . もう一度 ハンドラ を実行しているスレッド内でのみ呼び出されます。 io_service . したがって、作業がいつ行われるか(3か5)に関係なく、以下のことが保証されます。 handle_async_receive() が起動されるのは io_service.run() (5).

この3つのステップの間に時間と空間の隔たりがあることを、コントロールフローの反転と呼びます。 非同期プログラミングを難しくしている複雑さの一つです。 しかし、これを緩和するためのテクニックがあり、例えば コルーチン .

何をするのか io_service.run() は何をするのか?

スレッドが io_service.run() を呼び出すと、仕事と ハンドラ はこのスレッド内から呼び出される。 上の例では io_service.run() (5)はどちらかになるまでブロックされます。

  • の両方から呼び出され、返されます。 print ハンドラから返され、受信処理は成功または失敗で完了し、その handle_async_receive ハンドラが呼び出され、返されました。
  • io_service を明示的に停止させるには io_service::stop() .
  • ハンドラ内から例外がスローされます。

psuedo的な流れの可能性としては、以下のように記述することができます。

io_serviceを作成
ソケットを作成する
io_serviceに印刷ハンドラを追加(1)
ソケットの接続を待つ(2)
io_serviceに非同期の読み込み作業依頼を追加する (3)
io_serviceにprint handlerを追加 (4)
io_serviceを実行する (5)
  ワークやハンドラはありますか?
    はい、ワークが1つ、ハンドラが2つあります。
      ソケットにデータがあるか? いいえ、何もしません。
      プリントハンドラを実行 (1)
  ワークまたはハンドラがありますか?
    はい、1つのワークと1つのハンドラがあります。
      ソケットにデータはありますか?
      プリントハンドラ(4)を実行
  ワークまたはハンドラがありますか?
    はい、1つのワークがあります。
      ソケットにデータがあるか? いいえ、そのまま待ち続ける
  -- ソケットがデータを受信
      ソケットにデータがある場合、それをバッファに読み込む
      io_serviceにhandle_async_receiveハンドラを追加する。
  作業やハンドラはありますか?
    はい、1つのハンドラです。
      handle_async_receive ハンドラを実行する (3)
  仕事かハンドラか?
    いいえ、io_serviceをstoppedとし、returnします。

読み込みが終了したときに、別の ハンドラ io_service . この微妙なディテールが、非同期プログラミングの重要な特徴である。 これによって ハンドラ を連鎖させることができます。 例えば、もし handle_async_receive が期待したデータをすべて取得できなかった場合、その実装は別の非同期読み取り操作をポストすることができ、その結果 io_service にはより多くの作業が発生し、その結果 io_service.run() .

ただし io_service が使い果たされた場合、アプリケーションは reset() io_service を実行してから、再度実行します。


質問例と質問例3aのコード

さて、質問で参照された2つのコードを調べてみましょう。

質問コード

socket->async_receive に作業を追加します。 io_service . このように io_service->run() は読み込み操作が成功またはエラーで完了するまでブロックされ ClientReceiveEvent が実行を終了するか、例外を投げるまでブロックします。

例 3a コード

より分かりやすくするために、例3aの注釈を小さくしたものを掲載します。

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

高レベルのプログラムでは、2つのスレッドを作成して io_service のイベントループを処理する2つのスレッドを作成します(2)。 この結果、フィボナッチ数を計算するシンプルなスレッドプールができあがります(3)。

質問コードとこのコードの大きな違いは、このコードで呼び出した io_service::run() (2) に実際の作業とハンドラが追加されます。 io_service (3). を防ぐために io_service::run() がすぐに戻ってこないように io_service::work オブジェクトが生成される(1)。 このオブジェクトは io_service が仕事不足になるのを防ぎます。したがって io_service::run() は仕事がないために戻ることはありません。

全体の流れは次のようになります。

  1. を作成し io_service::work オブジェクトに追加された io_service .
  2. を呼び出すスレッドプールを作成しました。 io_service::run() . これらのワーカスレッドは io_service から戻ってきません。 io_service::work オブジェクトになります。
  3. フィボナッチ数を計算するハンドラを3つ追加して io_service に追加し、すぐに返します。 メインスレッドではなくワーカスレッドが、これらのハンドラの実行をすぐに開始することができます。
  4. を削除します。 io_service::work オブジェクトを削除します。
  5. ワーカスレッドの実行が終了するのを待ちます。 これは、3 つのハンドラすべてが実行を終了したときにのみ発生します。 io_service にはハンドラも仕事もないからです。

このコードは、オリジナルコードと同じ方法で、ハンドラを io_service に、そして io_service のイベントループが処理されます。 これによって io_service::work を使用する必要がなくなり、以下のようなコードになります。

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}


同期と非同期

問題のコードは非同期処理を使用していますが、非同期処理の完了を待っているため、実質的に同期的に機能しています。

socket.async_receive(buffer, handler)
io_service.run();

とは等価である。

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

一般的な経験則として、同期と非同期の操作を混在させないようにしてください。 多くの場合、複雑なシステムを複雑なシステムにしてしまう可能性があります。 この 答え は、非同期プログラミングの利点を強調していますが、そのうちのいくつかは、Boost.Asioで取り上げられている ドキュメント .