[解決済み】Spark Exponential Moving Averageについて
2022-02-06 06:09:05
質問
ID、日付、価格を持つ時系列価格データのデータフレームがあります。
価格カラムの指数移動平均を計算し、データフレームに新しいカラムとして追加する必要があります。
以前からSparkの窓関数は使っていて、今回のユースケースには合っているように見えたのですが、EMAの計算式を考えると
EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)
ここで
multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now
実際に列の上をウィンドウで移動しながら、列の前の計算された値にアクセスするにはどうしたらいいか、ちょっと混乱しました。 単純な移動平均では、ウィンドウ内の要素を平均化しながら新しい列を計算すればよいので、簡単です。
var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))
しかし、EMAの場合は、各ステップで前の計算値が必要なので、少し複雑になるようです。
も見てみました。 Pysparkの加重移動平均 しかし、Spark/Scala用のアプローチが必要で、10日または30日のEMAのために。
何かアイデアはありますか?
解決方法は?
最後に、指数移動平均がpandasのdataframeでどのように実装されているかを分析しました。上で説明した再帰的な式の他に、SQLやwindow関数で実装するのが難しい(再帰的なので)別の式があります。 課題追跡システム :
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).
これを踏まえて、さらにSpark実装のヘルプとして こちら というのとほぼ同等の実装になりました。 pandas_dataframe.ewm(span=window_size).mean() .
def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
val window = Window.partitionBy(partitionColumn)
val exponentialMovingAveragePrefix = "_EMA_"
val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
val alpha = 2.0 / (windowSize + 1)
val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
}
(adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
})
dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
.withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
.drop("row_nr")
}
(指数移動平均を計算する列の型は Double と仮定しています。)
他の方の参考になれば幸いです。
関連
-
[解決済み] java.lang.NoClassDefFoundError: scala/Product$class
-
[解決済み] GSON JsonObject "Unsupported Operation Exception: null" getAsString
-
[解決済み] AWSのためのScala SDKまたはインターフェースはありますか?
-
[解決済み] Seq[Future[Person]]ではなく、Future[Seq[Person]]を取得する方法
-
[解決済み] Spark - Sparkでパーセンタイルを計算する方法は?
-
[解決済み] sbtのlibraryDependenciesで言うところの++=と+=の違いは何ですか?
-
[解決済み] ScalaのDSLって何?[クローズド]
-
[解決済み】Scalaでケースクラスのインスタンスをクローンして、1つのフィールドだけを変更するにはどうすればよいですか?
-
[解決済み】Akka Kill vs. Stop vs. Poison Pill?
-
[解決済み] Scalaの==と.equalsの違いは何ですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み】scalacコンパイルで "object apache is not a member of package org "と表示される。
-
[解決済み] Scala forallの例?
-
[解決済み] ScalaのSeqへのアペンド
-
[解決済み] NotUsedとDoneの理解
-
[解決済み] Scalaの「コンテキストバウンド」とは何ですか?
-
[解決済み] Scalaのオブジェクトとクラスの違い
-
[解決済み] Scalaでファイル全体を読む?
-
[解決済み】タイプセーフのenum型をモデル化する方法は?
-
[解決済み] Scalaは、コレクションをMap-by-keyに変換する最良の方法ですか?
-
[解決済み] データセットにカスタムオブジェクトを格納する方法は?