[解決済み] DataFrameのパーティショニングはどのように定義するのですか?
質問
Spark 1.4.0からSpark SQLとDataFramesを使い始めました。 ScalaでDataFramesにカスタムパーティショナーを定義したいのですが、その方法がわかりません。
私が作業しているデータテーブルの1つは、以下の例のようなアカウントごとのトランザクションのリストを含んでいます。
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
少なくとも最初は、計算のほとんどはアカウント内のトランザクション間で行われます。 そのため、あるアカウントのすべてのトランザクションが同じ Spark パーティションにあるように、データをパーティション化したいと思います。
しかし、これを定義する方法が見当たりません。 DataFrame クラスには 'repartition(Int)' というメソッドがあり、作成するパーティションの数を指定することができます。 しかし、RDDに指定できるような、DataFrameのカスタムパーティショナーを定義するためのメソッドは見当たりません。
ソースデータは Parquet で保存されています。 DataFrame を Parquet に書き込むときに、パーティション分割する列を指定できることがわかりましたので、おそらく Parquet に 'Account' 列でデータをパーティション分割するように指示することができます。 しかし、何百万ものアカウントがある可能性があり、私が正しく理解していれば、Parquet はアカウントごとに個別のディレクトリを作成することになるため、合理的な解決策とは思えませんでした。
Spark にこの DataFrame をパーティショニングさせ、Account のすべてのデータが同じパーティションにあるようにする方法はありますか?
どのように解決するのですか?
Spark 2.3.0を使用しています。
SPARK-22614 は、レンジパーティショニングを公開します。
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
SPARK-22389 は、外部フォーマットパーティショニングを データソース API v2 .
スパーク >= 1.6.0
Spark >= 1.6では、クエリやキャッシュにカラムによるパーティショニングを使用することが可能です。参照してください。
SPARK-11410
と
スパーク-4849
を使用して
repartition
メソッドを使用しています。
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
とは異なり
RDDs
スパーク
Dataset
(を含む
Dataset[Row]
を含む
DataFrame
) は、今のところカスタムパーティショナーを使用することができません。人工的なパーティショニングカラムを作成することで一般的に対処できますが、同じ柔軟性を与えることはできません。
Spark 1.6.0 を参照してください。
できることの一つは、入力データをあらかじめパーティション分けしてから
DataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
以降
DataFrame
からの生成は
RDD
からの生成は、既存のパーティションレイアウトを保持する単純なマップフェーズのみを必要とします*。
assert(df.rdd.partitions == partitioned.partitions)
同じように、既存の
DataFrame
:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
というわけで、不可能ではなさそうです。問題は、それが全く意味をなさないかどうかです。私は、ほとんどの場合、それは意味がないと主張します。
-
再パーティショニングは高価な処理です。典型的なシナリオでは、ほとんどのデータをシリアル化し、シャッフルし、デシリアライズする必要があります。一方、事前にパーティショニングされたデータから恩恵を受けることができる操作の数は比較的少なく、内部 API がこの特性を活用するように設計されていない場合は、さらに制限されます。
- はいくつかのシナリオでジョインしますが、内部サポートが必要になります。
- マッチングパーティショナーによるウィンドウ関数呼び出し。上記と同じで、1つのウィンドウ定義に限定されます。しかし、すでに内部でパーティショニングされているので、事前のパーティショニングは冗長になるかもしれません。
-
による単純な集計。
GROUP BY
- を使うことで、一時的なバッファのメモリ使用量を減らすことができます**が、全体的なコストはかなり高くなります。多かれ少なかれgroupByKey.mapValues(_.reduce)
(現在の動作) とreduceByKey
(パーティション分割前)の比較。実際に役に立つとは思えません。 -
によるデータ圧縮は
SqlContext.cacheTable
. ランレングスエンコーディングを使用しているようなのでOrderedRDDFunctions.repartitionAndSortWithinPartitions
を適用すると圧縮率が向上する可能性があります。
-
パフォーマンスは、キーの分布に大きく依存します。分布が偏っている場合、リソースの利用が最適化されないことになります。最悪の場合、ジョブを終了することはまったく不可能になります。
- 高レベルの宣言的なAPIを使用することの要点は、低レベルの実装の詳細から自分自身を分離することです。すでに @dwysakowicz と ロミ・クンツマン の仕事であり、最適化は 触媒オプティマイザ . それはかなり洗練された獣であり、私は本当にあなたがその内部にはるかに深くダイビングせずに簡単にそれを改善することができます疑う。
関連する概念
JDBCソースでのパーティショニング :
JDBC データソースのサポート
predicates
引数
. 以下のように使用することができます。
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
これは、述語ごとに1つのJDBCパーティションを作成します。もし、個々の述語を使って作られたセットが不連続でない場合、結果のテーブルに重複が見られることに注意してください。
partitionBy
メソッドで
DataFrameWriter
:
スパーク
DataFrameWriter
提供
partitionBy
このメソッドは、書き込み時にデータを分割するために使用されます。これは、提供されたカラムのセットを使用して、書き込み時にデータを分離します。
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
これは、キーに基づくクエリのためのreadでの述語のプッシュダウンを可能にします。
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
と同じですが,これは
DataFrame.repartition
. 特に集合体のような
val cnts = df1.groupBy($"k").sum()
が必要な場合でも
TungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
メソッドで
DataFrameWriter
(Spark >= 2.0)を参照してください。
bucketBy
と同じような用途があります。
partitionBy
と同様の用途がありますが、テーブルに対してのみ有効です (
saveAsTable
). バケット情報は結合を最適化するために使用することができます。
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* 以下のとおりです。
パーティション・レイアウト
データの分布だけを意味します。
partitioned
RDDにはもはやパーティショナーはない。
** 初期投影がないと仮定します。集計が列の小さなサブセットのみをカバーする場合、おそらく何の利益もありません。
関連
-
[解決済み] Scalaで「:+」は何を意味するのか
-
[解決済み] PandasでDataFrameの行を反復処理する方法
-
[解決済み] 列の値に基づいてDataFrameから行を選択するにはどうすればよいですか?
-
[解決済み] Pandas DataFrameからカラムを削除する
-
[解決済み] Pandasのデータフレームで複数の列を選択する
-
[解決済み] Pandas DataFrameの行数を取得する方法は?
-
[解決済み] データフレームの行を複数の列でソート(並び替え)する。
-
[解決済み] 既存のDataFrameに新しい列を追加する方法は?
-
[解決済み] 一行ずつ追加してPandas Dataframeを作成する
-
[解決済み】Pandas DataFrameのカラムヘッダからリストを取得する。
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Scalaでループから抜け出すにはどうしたらいいですか?
-
[解決済み] Scala: スライド(N,N) vs グループ化(N)
-
[解決済み] TimeoutExceptionが発生した場合、どのような原因が考えられるでしょうか。Sparkで作業しているときに[n秒]後にFuturesがタイムアウトしました[重複]。
-
[解決済み] ScalaのDSLって何?[クローズド]
-
[解決済み] SparkSQL - パーケットファイルを直接読み込む
-
[解決済み] Scalaのapply関数とは何ですか?
-
[解決済み】Scalaで`:_*`(コロン・アンダースコア・スター)は何をするのですか?
-
[解決済み] 2つのマップをマージし、同じキーの値を合計するための最良の方法?
-
[解決済み] Scalaの識別子 "implicitly "とは?
-
[解決済み] Build.scala、%および%%の記号の意味