1. ホーム
  2. node.js

[解決済み] RabbitMQ / AMQP:単一のキューで、同じメッセージに対して複数のコンシューマー?

2022-02-18 15:53:40

質問

RabbitMQとAMQPを一般的に使い始めたところです。

  • メッセージのキューがある
  • 複数のコンシューマがあり、それぞれで異なる処理を行いたい。 同じメッセージ .

RabbitMQのドキュメントのほとんどは、ラウンドロビン、つまり1つのメッセージが1つのコンシューマによって消費され、負荷が各コンシューマ間で分散されることに焦点を当てているようです。これは実際に私が目撃した動作です。

例えば、プロデューサーが1つのキューを持ち、2秒ごとにメッセージを送信する場合です。

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

そして、これが消費者です。

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

コンシューマを2回起動すると 各コンシューマがラウンドロビン方式でメッセージを交互に消費しているのがわかります。例えば、1つの端末でメッセージ1、3、5を、もう1つの端末で2、4、6を見ることができます。 .

質問なのですが

  • 各コンシューマーに同じメッセージを受信させることはできますか?つまり、両方のコンシューマーがメッセージ1、2、3、4、5、6を受け取るようにできますか?AMQP/RabbitMQでは、これは何と呼ばれていますか?通常、どのように設定されますか?

  • これは一般的に行われていることなのでしょうか?代わりに、Exchangeがメッセージを2つの別々のキューにルーティングし、単一のコンシューマを持つようにすればいいのでしょうか?

解決方法は?

各コンシューマーに同じメッセージを受信させることはできますか?つまり、両方のコンシューマーがメッセージ1、2、3、4、5、6を受け取るようにできますか?AMQP/RabbitMQでは、これは何と呼ばれていますか?通常、どのように設定するのですか?

いいえ、コンシューマが同じキューにいる場合は違います。RabbitMQの AMQPのコンセプト のガイドを参照してください。

AMQP 0-9-1 では、メッセージはコンシューマ間で負荷分散されることを理解しておくことが重要です。

これは、次のことを意味しているようです。 キュー内のラウンドロビンの動作は決まっています。 であり、設定可能ではありません。つまり、同じメッセージIDを複数のコンシューマで処理させるためには、別々のキューが必要なのです。

このようなことは一般的に行われているのでしょうか?代わりに、交換機がメッセージを2つの別々のキューにルーティングし、1つのコンシューマーを持つようにすればいいのでしょうか?

単一キュー/複数コンシューマーで、各コンシューマーが同じメッセージIDを処理することはできません。交換機がメッセージを2つの別々のキューにルーティングすることは、確かに良いことです。

あまり複雑なルーティングを必要としないので ファンアウトエクスチェンジ がうまく処理します。node-amqpには'default exchange'という概念があり、接続に直接メッセージを発行することができますが、ほとんどのAMQPメッセージは特定のExchangeに発行されるので、先ほどExchangeにあまり焦点を当てませんでした。

これが私のファンアウトのエクスチェンジで、送信と受信の両方です。

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})