Pyspark。複数の配列の列を行に分割する
2023-07-08 16:10:41
質問
1つの行といくつかの列を持つデータフレームがあります。いくつかの列は単一の値であり、他の列はリストです。すべてのリスト列は、同じ長さです。 リスト以外の列はそのままにして、各リスト列を別の行に分割したいと思います。
サンプルDFです。
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
私が欲しいもの
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
リストカラムが1つしかなければ
explode
:
df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
しかし、もし私が
explode
を指定すると
c
カラムを使用すると、必要な長さの二乗のデータフレームが作成されます。
df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
私が欲しいのは - 各列について、その列の配列のn番目の要素を取り、それを新しい行に追加することです。私はデータフレーム内のすべての列にexplodeをマッピングしようとしましたが、それもうまくいかないようです。
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
どのように解決するのですか?
Spark >= 2.4
を置き換えることができます。
zip_
udf
を
arrays_zip
機能
from pyspark.sql.functions import arrays_zip, col, explode
(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))
スパーク 2.4
とは
DataFrames
とUDFを使用します。
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
とは
RDDs
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
どちらの解決策も、Pythonの通信オーバーヘッドにより非効率的です。データサイズが固定であれば、このようにすることができます。
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
とかでもいい。
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
UDFやRDDと比較して、大幅に高速化されるはずです。任意の数のカラムをサポートするように一般化。
# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))
関連
-
[解決済み] リストを均等な大きさの塊に分割するには?
-
[解決済み] Pandasのデータフレームで複数の列を選択する
-
[解決済み] pandasを使った "大量データ "ワークフロー【終了しました
-
[解決済み] pandas GroupByを使ってグループごとの統計情報(カウント、平均値など)を取得する?
-
[解決済み] pandas が他の列の値に基づいて新しい列を作成する / 複数の列の関数を行単位で適用する
-
[解決済み】Pandas DataFrameのカラムヘッダからリストを取得する。
-
[解決済み】pandasでカラムの種類を変更する
-
[解決済み】複数のリストをdataframeに取り込む
-
[解決済み】pandas dataframeの文字列エントリーを分割(explode)して別の行にする。
-
[解決済み] pipの依存性/必要条件をリストアップする方法はありますか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Pandasのデータフレームでタプルの列を分割するにはどうしたらいいですか?
-
[解決済み] SQLAlchemy: セッションの作成と再利用
-
[解決済み] Pythonのargparseを使った隠し引数の作成
-
[解決済み] データフレームをソートした後にインデックスを更新する
-
[解決済み] python-requests モジュールからのすべてのリクエストをログに記録します。
-
[解決済み] スペースがないテキストを単語のリストに分割する方法
-
[解決済み] オブジェクトのリストに特定の属性値を持つオブジェクトが含まれているかどうかをチェックする
-
[解決済み] 異なる順序で同じ要素を持つ2つのJSONオブジェクトを等しく比較するには?
-
[解決済み] Pythonの文字列書式をリストで使う
-
[解決済み] Django filter queryset __in for *every* item in list