[解決済み] spark dataframeの書き込みメソッドで特定のパーティションを上書きする
2023-02-27 22:14:34
質問
spark ですべてではなく、特定のパーティションを上書きしたいのです。以下のコマンドを試しています。
df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
ここで、dfは上書きされるインクリメンタルデータを持つデータフレームです。
hdfs-base-pathにはマスターデータが格納されています。
上記のコマンドを試すと、すべてのパーティションが削除され、dfに存在するパーティションがhdfsパスに挿入されます。
私の要件は、指定された hdfs パスの df に存在するパーティションのみを上書きすることです。どなたか、この件で私を助けていただけませんか。
どのように解決するのですか?
これはよくある問題です。2.0までのSparkでは、パーティションディレクトリに直接書き込むことで、唯一の解決策となります(例)。
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
2.0より前のSparkを使用している場合、Sparkがメタデータファイルを出力しないようにする必要があります(自動パーティション発見ができなくなるため)。
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
1.6.2以前のSparkを使っている場合は
_SUCCESS
のファイルを削除する必要があります。
/root/path/to/data/partition_col=value
ファイルがないと、パーティションの自動検出ができなくなります。(1.6.2 以降を使用することを強くお勧めします)。
大規模なパーティショニングされたテーブルを管理する方法について、私の Spark Summit での講演でもう少し詳しく知ることができます。 防弾ジョブ .
関連
-
[解決済み】spark.driver.maxResultSizeとは何ですか?
-
[解決済み] spark 2.4.4 をインストールした後に pyspark を実行しようとすると、「TypeError: an integer is required (got type bytes)」というエラーが発生するのを修正する方法
-
[解決済み] Sparkのバージョンを確認する方法【終了しました
-
[解決済み] spark checkpointとpersist to a diskの違いは何ですか?
-
[解決済み] プロパティspark.yarn.jars - どのようにそれに対処するのですか?
-
[解決済み] Spark - repartition() vs coalesce()
-
[解決済み】Spark StandaloneクラスタのWorker、Executor、Coreとは何ですか?
-
[解決済み] DataFrameのパーティショニングはどのように定義するのですか?
-
[解決済み] Sparkジョブがorg.apache.spark.shuffle.MetadataFetchFailedExceptionで失敗する理由は何ですか?Shuffle 0 の投機モードでの出力場所がない?
-
[解決済み] sparkのexecutor数、cores、executor memoryのチューニング方法は?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Spark が "java.net.URISyntaxException" を報告するのはなぜですか?DataFrameを使用する際に「java.net.URIStyntaxException: Relative path in absolute URI」と表示されるのはなぜですか?
-
[解決済み] format("kafka") で "Failed to find data source: kafka." とエラーになるのはなぜですか?(uber-jarを使用しても)失敗しますか?
-
[解決済み] ShuffledRDD、MapPartitionsRDD、ParallelCollectionRDDの違いは何ですか?
-
[解決済み] Sparkクラスタがハートビートのタイムアウトでいっぱいになり、エグゼキュータが勝手に終了してしまう。
-
[解決済み] pyspark : NameError: name 'spark' is not defined.
-
[解決済み] 実行中のSparkアプリケーションを終了させるには?
-
[解決済み】mapとflatMapの違いと、それぞれの良い使用例について教えてください。
-
[解決済み】Spark Dataframeで列の内容をすべて表示するにはどうすればよいですか?
-
[解決済み] 複数のテキストファイルを1つのRDDに読み込むには?
-
[解決済み] Apache BeamがSpark/Flinkよりもバッチ処理に優れている点は何ですか?