1. ホーム
  2. apache-kafka

[解決済み] トピックのデータをすべて削除したり、毎回実行する前にトピックを削除する方法はありますか?

2023-02-01 18:11:49

質問

トピックのデータをすべて削除したり、毎回実行前にトピックを削除する方法はありますか?

KafkaConfig.scalaファイルを修正して、その中の logRetentionHours プロパティを変更できますか? コンシューマーがメッセージを読んだらすぐに削除される方法はありますか?

私はどこからかデータを取得し、コンシューマが消費する特定のトピックにデータを送信するためにプロデューサーを使用しています。 私は、トピックに毎回新しいデータだけが欲しいのです。 何らかの方法でトピックを再初期化する方法はありますか?

どのように解決するのですか?

これを見てください JIRA 課題 トピック削除サポートの追加"。

手動で削除する場合。

  1. クラスターをシャットダウンする
  2. カフカログのディレクトリを掃除する ( log.dir 属性で指定されます。 コンフィグ ファイル) と zookeeper のデータです。
  3. クラスターを再起動する

任意のトピックに対して、あなたができることは

  1. カフカを停止する
  2. パーティションごとにカフカのログを消去する場合、カフカはログファイルを "logDir/topic-partition" というフォーマットで保存します。 /tmp/kafka-logs/MyTopic-0 ここで /tmp/kafka-logslog.dir 属性で指定されます。
  3. カフカを再起動する

これは NOT を使用するのは良い方法であり、推奨される方法ですが、うまくいくはずです。 Kafka ブローカー設定ファイルでは log.retention.hours.per.topic 属性は The number of hours to keep a log file before deleting it for some specific topic

また、コンシューマがメッセージを読んだらすぐに削除されるような方法はありますか?

からの Kafka ドキュメント :

Kafkaクラスタは、消費されたかどうかにかかわらず、公開されたすべてのメッセージを設定可能な期間保持します。たとえば、ログの保存期間が2日に設定されている場合、メッセージが公開されてから2日間は消費可能で、それ以降は空き容量を確保するために破棄されます。Kafkaのパフォーマンスはデータサイズに対して実質的に一定なので、多くのデータを保持しても問題ありません。

実際、コンシューマごとに保持されるメタデータは、ログ内のコンシューマの位置だけで、オフセットと呼ばれます。このオフセットはコンシューマによって制御されます。通常、コンシューマはメッセージを読み込むとそのオフセットを直線的に進めますが、実際には位置はコンシューマによって制御され、好きな順序でメッセージを消費することができます。たとえば、コンシューマは古いオフセットにリセットして再処理を行うことができます。

Kafka 0.8で読み込む開始オフセットを見つけるには シンプルなコンシューマーの例 と言っています。

Kafkaには2つの定数が用意されています。 kafka.api.OffsetRequest.EarliestTime() は、ログにあるデータの先頭を見つけ、そこからストリーミングを開始します。 kafka.api.OffsetRequest.LatestTime() は新しいメッセージのみをストリーミングします。

また、コンシューマ側でオフセットを管理するためのサンプルコードもそこにあります。

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}