[解決済み] ConcurrentKafkaListenerContainerFactoryはいつ使用するのですか?
質問
私はkafkaの初心者です。
ドキュメント
が、何も理解できませんでした。どなたか、どのような場合に
ConcurrentKafkaListenerContainerFactory
クラスはどのようなものですか?私はこれまで
Kafkaconsumer
クラスが表示されますが
ConcurrentKafkaListenerContainerFactory
は、私の現在のプロジェクトで使用されています。どのような目的で使用されているのか説明してください。
どのように解決するのですか?
Kafkaコンシューマはスレッドセーフではありません。すべてのネットワークI/Oは、呼び出しを行ったアプリケーションのスレッドで発生します。マルチスレッドでのアクセスが適切に同期されていることを確認するのは、ユーザーの責任です。非同期なアクセスは、結果として
ConcurrentModificationException.
コンシューマーがデータを取得するために複数のパーティションを割り当てられた場合、コンシューマーはすべてのパーティションから同時にデータを取得しようとし、事実上、これらのパーティションに同じ消費優先度を与えます。しかし、場合によっては、コンシューマはまず割り当てられたパーティションのサブセットから全速力でデータを取得することに集中し、他のパーティションで消費するデータがほとんどない場合にのみ、他のパーティションの取得を開始したいと思うかもしれません。
スプリングカフカ
ConcurrentKafkaListenerContainerFactory
は、アノテーションされたメソッドのコンテナを作成するために
@KafkaListener
には、2つの
MessageListenerContainer
Spring kafkaの
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
は
KafkaMessageListenerContainer
は、すべてのトピックまたはパーティションからのすべてのメッセージを1つのスレッドで受信します。またConcurrentMessageListenerContainer
を1つまたは複数のKafkaMessageListenerContainer
インスタンスを作成し、マルチスレッドで消費できるようにします。
ConcurrentMessageListenerContainerの使用方法
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
concurrencyプロパティを持つ。例えば、container.setConcurrency(3) は、3つの
KafkaMessageListenerContainer
のインスタンスを作成します。
が6つある場合
TopicPartition
インスタンスが提供され、同時実行数が3である場合、各コンテナは2つのパーティションを取得します。5 つの TopicPartition インスタンスが提供されている場合、2 つのコンテナは 2 つのパーティションを取得し、3 番目のコンテナは 1 つのパーティションを取得します。同時実行数が TopicPartition の数より大きい場合は、各コンテナが 1 つのパーティションを取得するように同時実行数が調整されます。
以下は分かりやすい例とドキュメントです。 こちら
関連
-
[解決済み】予期しない型エラー
-
[解決済み] JavaでArrayListではなくLinkedListを使用するのはいつですか?
-
[解決済み] serialVersionUIDとは何ですか、またなぜそれを使用する必要がありますか?
-
[解決済み] java.net.URLConnectionを使用してHTTPリクエストを発生させ処理する方法
-
[解決済み] 静的メソッドを使用する場合
-
[解決済み] 静的変数が悪とされるのはなぜですか?
-
[解決済み] Javaにおける "final class "の意味とは?
-
[解決済み】Android UserManager.isUserAGoat()の正しい使用例?)
-
[解決済み】Kafkaを(CQRS)イベントストアとして使用する。良いアイデアですか?
-
[解決済み】Kafkaコンシューマオフセットを決定するのは何ですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み】Android Studio クラス org.codehaus.groovy.runtime.InvokerHelper を初期化できませんでした。
-
[解決済み】imageio.IIOException: 入力ファイルが読み込めない
-
[解決済み】popBackStack()とreplace()の操作はどう違うのですか?
-
[解決済み】"比較メソッドはその一般契約に違反する!"
-
[解決済み】HTTPステータス500 サーブレットクラスのインスタンス化エラー [重複]。
-
[解決済み] 解決済み】Javaが「型をインスタンス化できない」というエラーを返す [重複] [重複]
-
[解決済み】Javaの部分文字列:「文字列のインデックスが範囲外」。
-
[解決済み】なぜjava.io.Fileにはcloseメソッドがないのでしょうか?
-
[解決済み】Javaを使用するSelenium - ドライバの実行ファイルのパスは、webdriver.gecko.driverシステムプロパティで設定する必要があります。
-
[解決済み】純粋なJUnitテストにVisibleForTestingを使用する方法