I. RocketMQの紹介

Apache RocketMQは、低遅延、高パフォーマンスと信頼性、1兆ドルの容量、および柔軟なスケーラビリティを備えた分散メッセージングおよびストリーミング・プラットフォームです。ネームサーバ、ブローカー、プロデューサー、ユーザの4つの部分から構成されています。

II. RocketMQのDockerインストール

1. RocketMQのイメージの検索

docker search rocketmq

2. ネームサーバーの起動

docker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:server-4.7.0

3. ブローカーを起動する

broker.conf ファイルを編集します。

vim /home/rocketmq/broker.conf


# Cluster name, you can configure more than one if there are more nodes
brokerClusterName = DefaultCluster
# broker name, master and slave use the same name to indicate their master-slave relationship
brokerName = broker-a
#0 means master, greater than 0 means different slave
brokerId = 0
# indicates when to do the message delete action, default is 4:00 am
deleteWhen = 04
# the length of time the message is kept on disk, in hours
fileReservedTime = 48
# has three values: SYNC_MASTER, ASYNC_MASTER, SLAVE; synchronous and asynchronous indicate the mechanism for synchronizing data between Master and Slave.
brokerRole = ASYNC_MASTER
# flush disk policy, takes the value: ASYNC_FLUSH, SYNC_FLUSH indicates synchronous and asynchronous flush disk; SYNC_FLUSH message is written to disk before returning success status, ASYNC_FLUSH is not required.
flushDiskType = ASYNC_FLUSH
# Set the ip address of the server where the broker node is located
brokerIP1 =


docker run -d -p 10911:10911 -p 10909:10909\
 --name rmqbroker --link rmqserver:namesrv\
 -e "NAMESRV_ADDR=" -e "JAVA_OPTS=-Duser.home=/opt"\
 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
 -v /home/rocketmq/broker.conf:/etc/rocketmq/broker.conf \

4. rocketmqコンソールを起動します。

docker run -d --name rmqconsole -p 8080:8080 --link rmqserver:namesrv\
 -e "JAVA_OPTS=-Drocketmq.namesrv.addr=
 -t styletang/rocketmq-console-ng

5. 可視化ページ、で

 III. RocketMQを使ったJava 

1. pom.xmlに依存関係を追加する


2. プロデューサーの作成

// 1 Create the message producer, specifying the generation group name
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("dzp-producer-group");
// 2 Specify the address of the NameServer
// 3 Start the producer
// 4 Build the message object, mainly setting the message subject, label, content
Message message = new Message("dzp-topic", "dzp-tag", "dzp-key", ("dzp test message sent").getBytes());
// 5 Send the message
SendResult result = defaultMQProducer.send(message);
System.out.println("SendResult-->" + result);
// 6 Close the producer

3. コンシューマの作成

// 1 Create the consumer, specifying the name of the consumer group to which it belongs
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("dzp-consumer-group");
// 2 Specify the address of the NameServer
// 3 Specify the topics and tags to which the consumer subscribes
defaultMQPushConsumer.subscribe("dzp-topic", "*");
// 4 Perform the subscription: register the callback function and write the logic to handle the message
defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
    //and return ConsumeConcurrentlyStatus.RECONSUME_LATER
    try {
      for (MessageExt messageExt : list) {

          String topic = messageExt.getTopic();
          System.out.println("topic-->" + topic);

          String tags = messageExt.getTags();
          System.out.println("tags-->" + tags);

          String keys = messageExt.getKeys();
          System.out.println("keys-->" + keys);

          String body = new String(messageExt.getBody());
          System.out.println("body-->" + body);

    } catch (Throwable throwable) {

       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    // 5 Start the consumer

4. メッセージを送信するページを可視化する


5. java プログラムで受信したデータ