[解決済み] トピックのデータをすべて削除したり、毎回実行する前にトピックを削除する方法はありますか?
質問
トピックのデータをすべて削除したり、毎回実行前にトピックを削除する方法はありますか?
KafkaConfig.scalaファイルを修正して、その中の
logRetentionHours
プロパティを変更できますか? コンシューマーがメッセージを読んだらすぐに削除される方法はありますか?
私はどこからかデータを取得し、コンシューマが消費する特定のトピックにデータを送信するためにプロデューサーを使用しています。 私は、トピックに毎回新しいデータだけが欲しいのです。 何らかの方法でトピックを再初期化する方法はありますか?
どのように解決するのですか?
これを見てください
JIRA 課題
トピック削除サポートの追加"。
手動で削除する場合。
- クラスターをシャットダウンする
-
カフカログのディレクトリを掃除する (
log.dir
属性で指定されます。 コンフィグ ファイル) と zookeeper のデータです。 - クラスターを再起動する
任意のトピックに対して、あなたができることは
- カフカを停止する
-
パーティションごとにカフカのログを消去する場合、カフカはログファイルを "logDir/topic-partition" というフォーマットで保存します。
/tmp/kafka-logs/MyTopic-0
ここで/tmp/kafka-logs
はlog.dir
属性で指定されます。 - カフカを再起動する
これは
NOT
を使用するのは良い方法であり、推奨される方法ですが、うまくいくはずです。
Kafka ブローカー設定ファイルでは
log.retention.hours.per.topic
属性は
The number of hours to keep a log file before deleting it for some specific topic
また、コンシューマがメッセージを読んだらすぐに削除されるような方法はありますか?
からの Kafka ドキュメント :
Kafkaクラスタは、消費されたかどうかにかかわらず、公開されたすべてのメッセージを設定可能な期間保持します。たとえば、ログの保存期間が2日に設定されている場合、メッセージが公開されてから2日間は消費可能で、それ以降は空き容量を確保するために破棄されます。Kafkaのパフォーマンスはデータサイズに対して実質的に一定なので、多くのデータを保持しても問題ありません。
実際、コンシューマごとに保持されるメタデータは、ログ内のコンシューマの位置だけで、オフセットと呼ばれます。このオフセットはコンシューマによって制御されます。通常、コンシューマはメッセージを読み込むとそのオフセットを直線的に進めますが、実際には位置はコンシューマによって制御され、好きな順序でメッセージを消費することができます。たとえば、コンシューマは古いオフセットにリセットして再処理を行うことができます。
Kafka 0.8で読み込む開始オフセットを見つけるには シンプルなコンシューマーの例 と言っています。
Kafkaには2つの定数が用意されています。
kafka.api.OffsetRequest.EarliestTime()
は、ログにあるデータの先頭を見つけ、そこからストリーミングを開始します。kafka.api.OffsetRequest.LatestTime()
は新しいメッセージのみをストリーミングします。
また、コンシューマ側でオフセットを管理するためのサンプルコードもそこにあります。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
関連
-
[解決済み] グループがリバランスしているため、ハートビートが失敗しました。
-
[解決済み] Kafkaメッセージにカスタムヘッダを追加する
-
[解決済み] kafkaとnifiの違い
-
[解決済み] Kafkaのキー/バリューペアベースのメッセージングは何のためにあるのですか?[クローズド]
-
[解決済み] HDFスキーマレジストリとConfluentの主な違いは何ですか?
-
[解決済み] JAAS または Kafka の設定に serviceName が定義されていない (Kerberos ではない)
-
[解決済み] Zookeeperにアクセスせずに-zookeeperフラグを使用してKafka 0.10トピックをすべて一覧表示する
-
[解決済み] Apache Kafkaのコンテキストで「Rebalancing」とはどういう意味ですか?
-
[解決済み] Kafkaでデータモデリング?トピックとパーティション
-
[解決済み] Kafka >= 0.10.1 における session.timeout.ms と max.poll.interval.ms の差分について
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] グループがリバランスしているため、ハートビートが失敗しました。
-
[解決済み] Kafkaメッセージにカスタムヘッダを追加する
-
[解決済み] kafkaとnifiの違い
-
[解決済み] Kafkaのキー/バリューペアベースのメッセージングは何のためにあるのですか?[クローズド]
-
[解決済み] HDFスキーマレジストリとConfluentの主な違いは何ですか?
-
[解決済み] Apache Kafkaのコンテキストで「Rebalancing」とはどういう意味ですか?
-
[解決済み] KafkaよりRabbitMQを使うべきタイミングは?[クローズド]
-
[解決済み】Kafkaトピックのパージ
-
[解決済み] Kafka 0.8.1.1でのトピックの削除
-
[解決済み] Kafka >= 0.10.1 における session.timeout.ms と max.poll.interval.ms の差分について