[解決済み] spark.sql.autoBroadcastJoinThresholdは、Datasetの結合演算子を使用して結合するために動作しますか?
質問
知りたいのは
spark.sql.autoBroadcastJoinThreshold
プロパティは、結合スキームがSpark SQLの代わりにDataset API結合を使用している場合でも、すべてのワーカーノードで(結合中に)小さいテーブルをブロードキャストするのに便利です。
大きい方のテーブルが250ギガ、小さい方のテーブルが20ギガの場合、この設定は必要でしょうか?
spark.sql.autoBroadcastJoinThreshold
= テーブル全体を送信するために、21ギガ(多分
Dataset
をすべてのワーカーノードに送信しますか?
例 :
-
データセットAPIへの参加
val result = rawBigger.as("b").join( broadcast(smaller).as("s"), rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID), "left_outer" )
-
SQL
select * from rawBigger_table b, smaller_table s where b.campign_id = s.campaign_id;
解決方法は?
まずはじめに
spark.sql.autoBroadcastJoinThreshold
と
broadcast
ヒントは別のメカニズムです。たとえ
autoBroadcastJoinThreshold
を設定することはできません。
broadcast
のヒントが優先されます。デフォルトの設定の場合。
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)
Sparkは
autoBroadcastJoinThreshold
で、自動的にデータをブロードキャストします。
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
自動ブロードキャストを無効にすると、Spark は標準の
SortMergeJoin
:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *Range (0, 100, step=1, splits=Some(8))
+- *Sort [id#3L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
を使用することを強制することができます。
BroadcastHashJoin
で
broadcast
のヒントが得られます。
df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
SQLには独自のヒント形式があります(Hiveで使われているものと似ています)。
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
spark.sql(
"SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)
ということで、ご質問にお答えします。
autoBroadcastJoinThreshold
を使用する場合に適用されます。
Dataset
APIを使用する場合は関係ありませんが、明示的な
broadcast
のヒントがあります。
さらに、大きなオブジェクトをブロードキャストしても、パフォーマンスが向上することはほとんどなく、実際にはパフォーマンスを低下させ、安定性の問題が発生することがよくあります。ブロードキャストされたオブジェクトは、まずドライバに取り込まれ、次に各ワーカーに送られ、最後にメモリにロードされる必要があることを思い出してください。
関連
-
[解決済み】spark.driver.maxResultSizeとは何ですか?
-
[解決済み] spark.sql.autoBroadcastJoinThresholdは、Datasetの結合演算子を使用して結合するために動作しますか?
-
[解決済み] スパーク "プランの文字列表現が大きすぎたため、切り捨てました。" 手動で作成した集計式を使用した場合の警告
-
[解決済み] Spark が "java.net.URISyntaxException" を報告するのはなぜですか?DataFrameを使用する際に「java.net.URIStyntaxException: Relative path in absolute URI」と表示されるのはなぜですか?
-
[解決済み] Apache SparkとAkkaの比較【終了しました
-
[解決済み] format("kafka") で "Failed to find data source: kafka." とエラーになるのはなぜですか?(uber-jarを使用しても)失敗しますか?
-
[解決済み] Sparkクラスタがハートビートのタイムアウトでいっぱいになり、エグゼキュータが勝手に終了してしまう。
-
[解決済み] TypeError: 'Column' オブジェクトは WithColumn を使用して呼び出すことができません。
-
[解決済み] Spark - repartition() vs coalesce()
-
[解決済み】Spark StandaloneクラスタのWorker、Executor、Coreとは何ですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] spark.sql.autoBroadcastJoinThresholdは、Datasetの結合演算子を使用して結合するために動作しますか?
-
[解決済み] SparkでcreateOrReplaceTempViewはどのように動作するのですか?
-
[解決済み] スパーク "プランの文字列表現が大きすぎたため、切り捨てました。" 手動で作成した集計式を使用した場合の警告
-
[解決済み] PySparkのデータフレームで、各キーのパーセンタイルはどのように計算されますか?
-
[解決済み] Spark: 2つのDataFrameを減算する
-
[解決済み] Sparkクラスタがハートビートのタイムアウトでいっぱいになり、エグゼキュータが勝手に終了してしまう。
-
[解決済み] pyspark : NameError: name 'spark' is not defined.
-
[解決済み] spark checkpointとpersist to a diskの違いは何ですか?
-
[解決済み】SparkコンソールにINFOメッセージを表示させないようにするには?
-
[解決済み】Spark StandaloneクラスタのWorker、Executor、Coreとは何ですか?