1. ホーム
  2. python

[解決済み] Spark DataFrameに新しいカラムを追加するにはどうすればよいですか(PySparkを使用)?

2022-04-24 12:17:07

質問

Spark DataFrame (using PySpark 1.5.1) を持っていて、新しいカラムを追加したいのですが、どうすればいいですか?

以下を試しましたが、成功しませんでした。

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

また、これを使うとエラーになりました。

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

では、PySparkを使って既存のDataFrameに新しい列(Python vectorに基づく)を追加するにはどうすればよいのでしょうか?

どのように解決するのですか?

には、任意の列を追加することはできません。 DataFrame をSparkで使用することができます。新しいカラムを作成するには、リテラルを使用します(他のリテラルの型は、以下の章で説明します)。 Spark DataFrameに定数カラムを追加する方法は? )

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

既存のカラムを変換する。

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

を使用して含まれています。 join :

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

または関数 / udf で生成されます。

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

性能面では、ビルトイン関数 ( pyspark.sql.functions ) は、通常、Pythonのユーザー定義関数よりもCatalyst式にマッピングされることが推奨されます。

任意のRDDの内容を列として追加したい場合は、次のようにします。