1. ホーム
  2. rabbitmq

[解決済み] RabbitMQのコンシューマテストを自動化する

2022-02-20 08:10:50

質問内容

私はRabbitMQクライアントを使用してメッセージを受信する.netのマイクロサービスを持っています、私は以下をテストする必要があります。

1- コンシューマがrabbitMqホストに正常に接続されている。

2- コンシューマーがキューをリッスンしています。

3- コンシューマーはメッセージを正常に受信しています。

上記を実現するために、メッセージを送信するサンプルアプリケーションを作成し、コンシューマーがメッセージを受信していることを確認するためにデバッグしています。

このテストを自動化し、マイクロサービスCIに組み込むにはどうすればよいですか?

私はCIに私のサンプルアプリを含めることを考えています。そうすれば、メッセージを発射し、特定の時間待機し、メッセージを受信した場合に合格するコンシューマユニット・テストを実行できます。

もう一つの方法は、ユニットテスト自体からサンプルアプリを起動することですが、サンプルアプリが動作しない場合、それはサービス障害となります。

RabbitMQを介して接続するマイクロサービスの統合テストのためのベストプラクティスはありますか?

どのように解決するのですか?

私はこのようなテストをたくさん作ってきました。私はいくつかの基本的なコードを .NET Core 2.0を使ったGitHubはこちらです。

これらの自動テストには、RabbitMQクラスタが必要です。各テストは、すでに存在するメッセージがないことを確認するために、キューを削除することから始まります。他のテストから既に存在するメッセージがあると、現在のテストが中断されます。

キューを削除するための簡単なヘルパーを用意しました。私のアプリケーションでは、常に独自のキューを宣言していますが、それがあなたのケースでない場合は、キューを再び作成し、任意の交換へのバインディングを作成する必要があります。

public class QueueDestroyer
{
    public static void DeleteQueue(string queueName, string virtualHost)
    {
        var connectionFactory = new ConnectionFactory();
        connectionFactory.HostName = "localhost";
        connectionFactory.UserName = "guest";
        connectionFactory.Password = "guest";
        connectionFactory.VirtualHost = virtualHost;
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.QueueDelete(queueName);
        connection.Close();
    }
}

あなたのマイクロサービスを表す、とてもシンプルなコンシューマの例を作りました。これは、キャンセルされるまでタスクで実行されます。

public class Consumer
{
    private IMessageProcessor _messageProcessor;
    private Task _consumerTask;

    public Consumer(IMessageProcessor messageProcessor)
    {
        _messageProcessor = messageProcessor;
    }

    public void Consume(CancellationToken token, string queueName)
    {
        _consumerTask = Task.Run(() =>
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: queueName,
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        _messageProcessor.ProcessMessage(message);
                    };
                    channel.BasicConsume(queue: queueName,
                                        autoAck: false,
                                            consumer: consumer);

                    while (!token.IsCancellationRequested)
                        Thread.Sleep(1000);
                }
            }
        });
    }

    public void WaitForCompletion()
    {
        _consumerTask.Wait();
    }
}

コンシューマはメッセージの処理作業を行うIMessageProcessorインターフェイスを持っています。私の統合テストでは、偽物を作りました。おそらく、お好みのモッキングフレームワークを使用することになるでしょう。

テストパブリッシャーはキューにメッセージをパブリッシュします。

public class TestPublisher
{
    public void Publish(string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                    routingKey: queueName,
                                    basicProperties: null,
                                    body: body);
        }
    }
}

私のテスト例はこのような感じです。

[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
    // ARRANGE
    QueueDestroyer.DeleteQueue("queueX", "/");
    var cts = new CancellationTokenSource();
    var fake = new FakeProcessor();
    var myMicroService = new Consumer(fake);

    // ACT
    myMicroService.Consume(cts.Token, "queueX");

    var producer = new TestPublisher();
    producer.Publish("queueX", "hello");

    Thread.Sleep(1000); // make sure the consumer will have received the message
    cts.Cancel();

    // ASSERT
    Assert.Equal(1, fake.Messages.Count);
    Assert.Equal("hello", fake.Messages[0]);
}

私のフェイクはこれです。

public class FakeProcessor : IMessageProcessor
{
    public List<string> Messages { get; set; }

    public FakeProcessor()
    {
        Messages = new List<string>();
    }

    public void ProcessMessage(string message)
    {
        Messages.Add(message);
    }
}

追加のアドバイスは

  • キューにランダムなテキストを追加し、テスト実行ごとに名前を交換できるのであれば、そうして同時進行のテストが互いに干渉し合うのを避けましょう。

  • キュー、エクスチェンジ、バインディングを宣言するためのヘルパーもコードに含まれていますが、もしあなたのアプリケーションがそれを行わないのであれば。

  • コネクションキラークラスを作成し、接続を強制終了させ、アプリケーションがまだ動作し、回復できることを確認します。私はそのためのコードを持っていますが、.NET Coreではありません。私に依頼していただければ、.NET Coreで動作するように修正することができます。

  • 一般的に、統合テストに他のマイクロサービスを含めるのは避けた方がいいと思うんだ。例えば、あるサービスから別のサービスにメッセージを送信し、メッセージが戻ってくることを期待する場合、期待される振る舞いを模擬できる偽のコンシューマを作成します。もし他のサービスからメッセージを受け取るのであれば、統合テストプロジェクトに偽のパブリッシャーを作りましょう。