1. ホーム
  2. python

[解決済み] RDD のインスタンスが必要だが、クラス 'pyspark.rdd.PipelinedRDD' が返された。

2022-02-09 16:06:34

質問

こんにちは、私はノートブックでこのコードを持っているとpythonのスパークをコーディングするためにtraying。

 mydataNoSQL.createOrReplaceTempView("mytable")
 spark.sql("SELECT * from mytable")
 return mydataNoSQL

def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))

RDD のインスタンスが必要ですが、クラス 'pyspark.rdd.PipelinedRDD' が表示されます。

何かお手伝いできることがあれば、よろしくお願いします。

解決方法は?

pyspark.rdd.PipelinedRDD のサブクラスです。 RDD で定義されたすべての API を備えていなければなりません。たとえば、PipelinedRDD は RDD に対してmap関数を実行すると作成されます。 RDD .

例えば、以下のスニペットを見てください。

>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD

ということで pyspark.rdd.PipelinedRDD と同じように RDD をコードに追加してください。

Pythonは動的型付け言語であるため、完全なキャスティングのサポートはありません。 pyspark.rdd.PipelinedRDD を通常のRDDに変換し、rddに集めて並列化することができます。

>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>

実行中 collect が発生することがあります。 MemoryError は、RDD のデータが大きい場合。