1. ホーム
  2. java

[解決済み] Kafkaで大きなメッセージ(15MB以上)を送信するにはどうすればよいですか?

2022-04-29 01:29:17

質問

Java Producer APIを使用して、Kafka V. 0.8にString-messageを送信しています。 メッセージのサイズが15MB程度になると MessageSizeTooLargeException . を設定しようとしました。 message.max.bytes を40MBに設定しても、例外が発生します。小さなメッセージは問題なく動作しました。

(例外はプロデューサーで発生します。このアプリケーションにはコンシューマーはありません)。

この例外を取り除くにはどうしたらよいですか?

私のプロデューサー設定例

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

エラーログです。

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

解決方法は?

3つ(または4つ)のプロパティを調整する必要があります。

  • 消費者側 fetch.message.max.bytes - は、コンシューマが取得可能なメッセージの最大サイズを決定します。
  • ブローカー側 replica.fetch.max.bytes - これにより、ブローカー内のレプリカがクラスタ内でメッセージを送信し、メッセージが正しく複製されることを確認できます。これが小さすぎると、メッセージは決して複製されず、したがって、メッセージがコミットされない(完全に複製されない)ため、コンシューマがメッセージを見ることはありません。
  • ブローカー側 message.max.bytes - これは、ブローカーがプロデューサーから受け取ることができるメッセージの最大サイズです。
  • ブローカー側(1トピックあたり)。 max.message.bytes - これは、ブローカーがトピックに追加することを許可するメッセージの最大サイズです。このサイズは圧縮前に検証されます。(デフォルトはブローカの message.max.bytes .)

Kafkaから例外やメッセージ、警告を受け取ることができないので、大きなメッセージを送信するときは、この点を考慮する必要があります。