1. ホーム
  2. apache-spark

[解決済み] spark.sql.autoBroadcastJoinThresholdは、Datasetの結合演算子を使用して結合するために動作しますか?

2022-02-03 19:25:06

質問

知りたいのは spark.sql.autoBroadcastJoinThreshold プロパティは、結合スキームがSpark SQLの代わりにDataset API結合を使用している場合でも、すべてのワーカーノードで(結合中に)小さいテーブルをブロードキャストするのに便利です。

大きい方のテーブルが250ギガ、小さい方のテーブルが20ギガの場合、この設定は必要でしょうか? spark.sql.autoBroadcastJoinThreshold = テーブル全体を送信するために、21ギガ(多分 Dataset をすべてのワーカーノードに送信しますか?

:

  • データセットAPIへの参加

    val result = rawBigger.as("b").join(
      broadcast(smaller).as("s"),
      rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID), 
      "left_outer"
    )
    
    
  • SQL

    select * 
    from rawBigger_table b, smaller_table s
    where b.campign_id = s.campaign_id;
    
    

解決方法は?

まずはじめに spark.sql.autoBroadcastJoinThresholdbroadcast ヒントは別のメカニズムです。たとえ autoBroadcastJoinThreshold を設定することはできません。 broadcast のヒントが優先されます。デフォルトの設定の場合。

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

String = 10485760

val df1 = spark.range(100)
val df2 = spark.range(100)

Sparkは autoBroadcastJoinThreshold で、自動的にデータをブロードキャストします。

df1.join(df2, Seq("id")).explain

== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

自動ブロードキャストを無効にすると、Spark は標準の SortMergeJoin :

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain

== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

を使用することを強制することができます。 BroadcastHashJoinbroadcast のヒントが得られます。

df1.join(broadcast(df2), Seq("id")).explain

== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

SQLには独自のヒント形式があります(Hiveで使われているものと似ています)。

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain

== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)

ということで、ご質問にお答えします。 autoBroadcastJoinThreshold を使用する場合に適用されます。 Dataset APIを使用する場合は関係ありませんが、明示的な broadcast のヒントがあります。

さらに、大きなオブジェクトをブロードキャストしても、パフォーマンスが向上することはほとんどなく、実際にはパフォーマンスを低下させ、安定性の問題が発生することがよくあります。ブロードキャストされたオブジェクトは、まずドライバに取り込まれ、次に各ワーカーに送られ、最後にメモリにロードされる必要があることを思い出してください。