1. ホーム
  2. python

[解決済み] PySparkのDataFrameで平均と標準偏差を計算するには?

2022-02-10 21:19:28

質問

PySparkのDataFrame( pandasではありません。 という名前の df を使うのはかなり大きい。 collect() . したがって、以下のコードは効率的ではありません。より少ないデータ量で動作していましたが、現在では失敗しています。

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

を取得する方法はありますか? meanstd を2つの変数として使用します。 pyspark.sql.functions のようなものでしょうか?

from pyspark.sql.functions import mean as mean_, std as std_

を使うことができました。 withColumn しかし、この方法では、行ごとに計算が適用され、単一の変数が返されません。

UPDATEしてください。

のサンプルコンテンツ df :

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

の平均と標準偏差を計算しなければならない。 score の値、例えば 1[691,1] はスコアの1つです。

解決方法は?

内蔵の関数を使って、集計された統計情報を得ることができます。ここでは、平均と標準偏差を取得する方法を説明します。

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

標準偏差の関数には3種類あることに注意してください。ドキュメントによると、私が使ったのは ( stddev )は次のように返します。

集計関数: の不偏標本標準偏差を返す。 グループ内の表現

を使用することができます。 describe() というメソッドもあります。

df.describe().show()

詳しくはこちらをご覧ください。 pyspark.sql.functions

アップデイト : このように、入れ子になっているデータを操作することができます。

使用方法 explode を使用して値を個別の行に抽出し、次に meanstddev のようになります。

ここにMWEがあります。

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

どの出力か。

[2.3333333333333335, 1.505545305418162]

これらの値が正しいかどうかを確認するには numpy :

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

説明 あなたの "products" カラムは listlist s. 呼び出し explode の各要素に対して新しい行を作成します。 list . 次に "score" の2番目の要素として定義した、分解された各行からの値です。 list . 最後に、この新しい列に対して集約関数を呼び出します。