1. ホーム
  2. java

[解決済み] ConcurrentKafkaListenerContainerFactoryはいつ使用するのですか?

2022-02-12 10:15:13

質問

私はkafkaの初心者です。 ドキュメント が、何も理解できませんでした。どなたか、どのような場合に ConcurrentKafkaListenerContainerFactory クラスはどのようなものですか?私はこれまで Kafkaconsumer クラスが表示されますが ConcurrentKafkaListenerContainerFactory は、私の現在のプロジェクトで使用されています。どのような目的で使用されているのか説明してください。

どのように解決するのですか?

Kafkaコンシューマはスレッドセーフではありません。すべてのネットワークI/Oは、呼び出しを行ったアプリケーションのスレッドで発生します。マルチスレッドでのアクセスが適切に同期されていることを確認するのは、ユーザーの責任です。非同期なアクセスは、結果として ConcurrentModificationException.

<ブロッククオート

コンシューマーがデータを取得するために複数のパーティションを割り当てられた場合、コンシューマーはすべてのパーティションから同時にデータを取得しようとし、事実上、これらのパーティションに同じ消費優先度を与えます。しかし、場合によっては、コンシューマはまず割り当てられたパーティションのサブセットから全速力でデータを取得することに集中し、他のパーティションで消費するデータがほとんどない場合にのみ、他のパーティションの取得を開始したいと思うかもしれません。

スプリングカフカ

ConcurrentKafkaListenerContainerFactory は、アノテーションされたメソッドのコンテナを作成するために @KafkaListener

には、2つの MessageListenerContainer Spring kafkaの

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

KafkaMessageListenerContainer は、すべてのトピックまたはパーティションからのすべてのメッセージを1つのスレッドで受信します。また ConcurrentMessageListenerContainer を1つまたは複数の KafkaMessageListenerContainer インスタンスを作成し、マルチスレッドで消費できるようにします。

ConcurrentMessageListenerContainerの使用方法

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

concurrencyプロパティを持つ。例えば、container.setConcurrency(3) は、3つの KafkaMessageListenerContainer のインスタンスを作成します。

が6つある場合 TopicPartition インスタンスが提供され、同時実行数が3である場合、各コンテナは2つのパーティションを取得します。5 つの TopicPartition インスタンスが提供されている場合、2 つのコンテナは 2 つのパーティションを取得し、3 番目のコンテナは 1 つのパーティションを取得します。同時実行数が TopicPartition の数より大きい場合は、各コンテナが 1 つのパーティションを取得するように同時実行数が調整されます。

以下は分かりやすい例とドキュメントです。 こちら