[解決済み] RabbitMQ / AMQP:単一のキューで、同じメッセージに対して複数のコンシューマー?
質問
RabbitMQとAMQPを一般的に使い始めたところです。
- メッセージのキューがある
- 複数のコンシューマがあり、それぞれで異なる処理を行いたい。 同じメッセージ .
RabbitMQのドキュメントのほとんどは、ラウンドロビン、つまり1つのメッセージが1つのコンシューマによって消費され、負荷が各コンシューマ間で分散されることに焦点を当てているようです。これは実際に私が目撃した動作です。
例えば、プロデューサーが1つのキューを持ち、2秒ごとにメッセージを送信する場合です。
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
そして、これが消費者です。
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
コンシューマを2回起動すると 各コンシューマがラウンドロビン方式でメッセージを交互に消費しているのがわかります。例えば、1つの端末でメッセージ1、3、5を、もう1つの端末で2、4、6を見ることができます。 .
質問なのですが
-
各コンシューマーに同じメッセージを受信させることはできますか?つまり、両方のコンシューマーがメッセージ1、2、3、4、5、6を受け取るようにできますか?AMQP/RabbitMQでは、これは何と呼ばれていますか?通常、どのように設定されますか?
-
これは一般的に行われていることなのでしょうか?代わりに、Exchangeがメッセージを2つの別々のキューにルーティングし、単一のコンシューマを持つようにすればいいのでしょうか?
解決方法は?
各コンシューマーに同じメッセージを受信させることはできますか?つまり、両方のコンシューマーがメッセージ1、2、3、4、5、6を受け取るようにできますか?AMQP/RabbitMQでは、これは何と呼ばれていますか?通常、どのように設定するのですか?
いいえ、コンシューマが同じキューにいる場合は違います。RabbitMQの AMQPのコンセプト のガイドを参照してください。
AMQP 0-9-1 では、メッセージはコンシューマ間で負荷分散されることを理解しておくことが重要です。
これは、次のことを意味しているようです。 キュー内のラウンドロビンの動作は決まっています。 であり、設定可能ではありません。つまり、同じメッセージIDを複数のコンシューマで処理させるためには、別々のキューが必要なのです。
このようなことは一般的に行われているのでしょうか?代わりに、交換機がメッセージを2つの別々のキューにルーティングし、1つのコンシューマーを持つようにすればいいのでしょうか?
単一キュー/複数コンシューマーで、各コンシューマーが同じメッセージIDを処理することはできません。交換機がメッセージを2つの別々のキューにルーティングすることは、確かに良いことです。
あまり複雑なルーティングを必要としないので ファンアウトエクスチェンジ がうまく処理します。node-amqpには'default exchange'という概念があり、接続に直接メッセージを発行することができますが、ほとんどのAMQPメッセージは特定のExchangeに発行されるので、先ほどExchangeにあまり焦点を当てませんでした。
これが私のファンアウトのエクスチェンジで、送信と受信の両方です。
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}
// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})
関連
-
[解決済み】mongoError: トポロジーが破壊されました
-
[解決済み】Nodejs: Errorを解決する方法。ENOENT: そのようなファイルまたはディレクトリがありません
-
[解決済み】AWS STS AssumeRoleへのアクセスを可能にする方法
-
[解決済み】Google spreadsheet api Requestに不十分な認証スコープがあった。
-
[解決済み】E11000重複キーエラー mongodb mongooseのインデックス
-
[解決済み] ブラウザで動作しているURLで「connect ETIMEDOUT」エラーが発生する原因は何ですか?
-
[解決済み] nodeファイルの先頭にある"/usr/bin/env node "は、具体的には何をするのですか?
-
[解決済み] npm install エラー - ローカルの発行者証明書を取得できません。
-
[解決済み] Heroku "状態が起動から停止に変更されました SIGTERMで全プロセスを停止"
-
[解決済み] のエラーが発生しました。これはおそらくnpmの問題ではありません。上に追加のログ出力があると思われます
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み】「安全なTLS接続が確立される前にクライアントのネットワークソケットが切断されました」(ノード10
-
[解決済み】Google spreadsheet api Requestに不十分な認証スコープがあった。
-
[解決済み】MongoDBでコレクションを日付で並べ替えるには?
-
[解決済み】MongoDBのデータ/DBが見つからない
-
[解決済み】nodemon - app crashed - waiting for file changes before start
-
[解決済み] Expectアサーションの型エラー -> expect(...).toExistは関数ではない
-
[解決済み] AWS s3 api error: specified bucket does not exist.
-
[解決済み] nodejsでfindAllのソート順を続編にする
-
[解決済み] エラーメッセージ MongoError: bad auth URI 文字列で認証に失敗しました。
-
[解決済み] エラー: ノード出力を "|head" にパイプするときに EPIPE を書き込む。