[解決済み] reduceByKeyとgroupByKeyとaggregateByKeyとcombineByKeyの違いのスパーク
2023-05-29 21:35:25
質問
の違いを説明できる人はいますか?
reducebykey
,
groupbykey
,
aggregatebykey
と
combinebykey
? これに関するドキュメントを読みましたが、正確な違いを理解することができませんでした。
例を挙げての説明があると助かります。
どのように解決するのですか?
groupByKeyです。
構文です。
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
は、データがネットワーク経由で送信され、縮小されたワーカーに収集されるため、ディスク外の問題を引き起こす可能性があります。
reduceByKeyです。
構文です。
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
データは各パーティションで結合され、ネットワーク上に送信するのは各パーティションの1つのキーに対して1つの出力のみとなります。
reduceByKey
は、すべての値をまったく同じ型を持つ別の値に結合する必要があります。
aggregateByKeyです。
と同じ
reduceByKey
と同じで、初期値をとります。
入力として3つのパラメータ
- 初期値
- コンバイナロジック
- シーケンスオペレーションロジック
例
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
を出力します。 キーで集計する 結果 バー -> 3 foo -> 5
combineByKeyです。
3つのパラメータを入力として
-
初期値:似て非なるもの
aggregateByKey
とは異なり、常に定数を渡す必要はなく、新しい値を返す関数を渡すことができます。 - マージ関数
- コンバイン機能
例
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,
aggregateByKey
,
combineByKey
より優先される
groupByKey
関連
-
[解決済み] PySparkのデータフレームで、各キーのパーセンタイルはどのように計算されますか?
-
[解決済み] Spark が "java.net.URISyntaxException" を報告するのはなぜですか?DataFrameを使用する際に「java.net.URIStyntaxException: Relative path in absolute URI」と表示されるのはなぜですか?
-
[解決済み] format("kafka") で "Failed to find data source: kafka." とエラーになるのはなぜですか?(uber-jarを使用しても)失敗しますか?
-
[解決済み] Spark: 2つのDataFrameを減算する
-
[解決済み] sparkでsaveAsTextFileするときのファイル名の付け方は?
-
[解決済み] spark.yarn.executor.memoryOverhead "の設定値?
-
[解決済み] プロパティspark.yarn.jars - どのようにそれに対処するのですか?
-
[解決済み】mapとflatMapの違いと、それぞれの良い使用例について教えてください。
-
[解決済み】SparkコンソールにINFOメッセージを表示させないようにするには?
-
[解決済み] RDD/Spark DataFrameの特定の列に基づく行からの重複の除去
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み】spark.driver.maxResultSizeとは何ですか?
-
[解決済み] spark.sql.autoBroadcastJoinThresholdは、Datasetの結合演算子を使用して結合するために動作しますか?
-
[解決済み] Apache SparkとAkkaの比較【終了しました
-
[解決済み] Spark: 2つのDataFrameを減算する
-
[解決済み] spark checkpointとpersist to a diskの違いは何ですか?
-
[解決済み] プロパティspark.yarn.jars - どのようにそれに対処するのですか?
-
[解決済み】Spark Dataframeで列の内容をすべて表示するにはどうすればよいですか?
-
[解決済み】SparkコンソールにINFOメッセージを表示させないようにするには?
-
[解決済み] 複数のテキストファイルを1つのRDDに読み込むには?
-
[解決済み] Apache BeamがSpark/Flinkよりもバッチ処理に優れている点は何ですか?