1. ホーム
  2. スカラ

[解決済み】タスクがシリアライズされない:オブジェクトではなくクラスに対してのみクロージャの外で関数を呼び出すとjava.io.NotSerializableExceptionが発生する

2022-04-02 07:57:05

質問

クロージャの外側で関数を呼び出すと、おかしな挙動になる。

  • 関数がオブジェクトの中にあるときはすべて動作している
  • 関数がクラス内にある場合、:

タスクはシリアライズできません: java.io.NotSerializableException: testing

問題は、私のコードがオブジェクトではなく、クラスで必要なことです。なぜこのようなことが起こるのか、何か思い当たることはありますか?Scalaのオブジェクトはシリアライズされているのでしょうか(デフォルト?)

これは動作するコード例です。

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

これは動作しない例です。

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

解決方法は?

RDDはSerialisableインタフェースを拡張する ということで、タスクが失敗する原因はこれではありません。だからといって RDD を使用し、Spark の NotSerializableException

Sparkは分散コンピューティングエンジンであり、その主な抽象化は弾力性のある分散データセット( RDD ) であり、分散コレクションと見なすことができる。基本的にRDDの要素はクラスタのノードに分割されていますが、Sparkはこれをユーザーから抽象化し、ユーザーはRDD(コレクション)をあたかもローカルなもののように操作することができるようになっています。

あまり細かいことは言いませんが、RDDに対して様々な変換を実行すると( map , flatMap , filter など)、あなたの変換コード(クロージャ)は。

  1. ドライバノードでシリアライズされます。
  2. クラスタ内の適切なノードに出荷されます。
  3. をデシリアライズする。
  4. そして最後にノード上で実行される

もちろん、(あなたの例のように)ローカルでこれを実行することもできますが、(ネットワーク経由の出荷を除けば)これらのフェーズはすべてそのまま発生します。[これによって、本番環境にデプロイする前にバグを発見することができます]。

2つ目のケースで起こることは、クラス testing を map 関数の内部から実行します。Sparkはそれを見て、メソッドを単独でシリアライズすることはできないので、Sparkは次のようにシリアライズしようとします。 全体 testing クラスは、別のJVMで実行されたときにコードがまだ動作するようにします。2つの可能性があります。

クラステストをシリアライズ可能にして、クラス全体をSparkでシリアライズできるようにするか。

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

または someFunc メソッドの代わりに関数(Scalaでは関数はオブジェクトです)を使用することで、Sparkはそれをシリアライズすることができます。

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

似たような、しかし同じではない、クラスのシリアライゼーションの問題は、あなたにとって興味深いものでしょう、あなたはそれについて読むことができます。 このSpark Summit 2013のプレゼンテーションでは .

余談ですが、このように rddList.map(someFunc(_)) から rddList.map(someFunc) というように、両者は全く同じです。通常、2番目の方が冗長でなく、すっきり読めるので好まれます。

を編集しました(2015-03-15)。 SPARK-5307 導入 SerializationDebugger で、Spark 1.3.0が最初の使用バージョンとなります。これは、シリアライゼーションパスを NotSerializableException . NotSerializableExceptionが発生すると、デバッガーはオブジェクトグラフを訪れてシリアライズできないオブジェクトへのパスを見つけ、ユーザーがそのオブジェクトを見つけるのに役立つ情報を構築します。

OPの場合、これが標準出力に出力されます。

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)