[解決済み】タスクがシリアライズされない:オブジェクトではなくクラスに対してのみクロージャの外で関数を呼び出すとjava.io.NotSerializableExceptionが発生する
質問
クロージャの外側で関数を呼び出すと、おかしな挙動になる。
- 関数がオブジェクトの中にあるときはすべて動作している
- 関数がクラス内にある場合、:
タスクはシリアライズできません: java.io.NotSerializableException: testing
問題は、私のコードがオブジェクトではなく、クラスで必要なことです。なぜこのようなことが起こるのか、何か思い当たることはありますか?Scalaのオブジェクトはシリアライズされているのでしょうか(デフォルト?)
これは動作するコード例です。
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
これは動作しない例です。
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
解決方法は?
RDDはSerialisableインタフェースを拡張する
ということで、タスクが失敗する原因はこれではありません。だからといって
RDD
を使用し、Spark の
NotSerializableException
Sparkは分散コンピューティングエンジンであり、その主な抽象化は弾力性のある分散データセット( RDD ) であり、分散コレクションと見なすことができる。基本的にRDDの要素はクラスタのノードに分割されていますが、Sparkはこれをユーザーから抽象化し、ユーザーはRDD(コレクション)をあたかもローカルなもののように操作することができるようになっています。
あまり細かいことは言いませんが、RDDに対して様々な変換を実行すると(
map
,
flatMap
,
filter
など)、あなたの変換コード(クロージャ)は。
- ドライバノードでシリアライズされます。
- クラスタ内の適切なノードに出荷されます。
- をデシリアライズする。
- そして最後にノード上で実行される
もちろん、(あなたの例のように)ローカルでこれを実行することもできますが、(ネットワーク経由の出荷を除けば)これらのフェーズはすべてそのまま発生します。[これによって、本番環境にデプロイする前にバグを発見することができます]。
2つ目のケースで起こることは、クラス
testing
を map 関数の内部から実行します。Sparkはそれを見て、メソッドを単独でシリアライズすることはできないので、Sparkは次のようにシリアライズしようとします。
全体
testing
クラスは、別のJVMで実行されたときにコードがまだ動作するようにします。2つの可能性があります。
クラステストをシリアライズ可能にして、クラス全体をSparkでシリアライズできるようにするか。
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
または
someFunc
メソッドの代わりに関数(Scalaでは関数はオブジェクトです)を使用することで、Sparkはそれをシリアライズすることができます。
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
似たような、しかし同じではない、クラスのシリアライゼーションの問題は、あなたにとって興味深いものでしょう、あなたはそれについて読むことができます。 このSpark Summit 2013のプレゼンテーションでは .
余談ですが、このように
rddList.map(someFunc(_))
から
rddList.map(someFunc)
というように、両者は全く同じです。通常、2番目の方が冗長でなく、すっきり読めるので好まれます。
を編集しました(2015-03-15)。 SPARK-5307 導入 SerializationDebugger で、Spark 1.3.0が最初の使用バージョンとなります。これは、シリアライゼーションパスを NotSerializableException . NotSerializableExceptionが発生すると、デバッガーはオブジェクトグラフを訪れてシリアライズできないオブジェクトへのパスを見つけ、ユーザーがそのオブジェクトを見つけるのに役立つ情報を構築します。
OPの場合、これが標準出力に出力されます。
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
関連
-
[解決済み] スパークSPLラウンド&ブラウンド
-
[解決済み] Scalaで定期的に関数を実行する
-
[解決済み] TimeoutExceptionが発生した場合、どのような原因が考えられるでしょうか。Sparkで作業しているときに[n秒]後にFuturesがタイムアウトしました[重複]。
-
[解決済み] Spark - CSVファイルをDataFrameとして読み込む?
-
[解決済み】Scalaにおける中括弧と括弧の正式な違い、また、どのような場合に使用すべきなのか?
-
[解決済み】コマンドラインパラメータを解析する最良の方法?[クローズド]
-
[解決済み] Scalaは、コレクションをMap-by-keyに変換する最良の方法ですか?
-
[解決済み] Build.scala、%および%%の記号の意味
-
[解決済み] Spark SQLでカラムの降順でソートするには?
-
[解決済み] Scalaのパターンマッチングシステムで比較演算子を使う
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Scalaでswitch/case(単純なパターンマッチ)を使うには?
-
[解決済み] expr() での lit() の使用について
-
[解決済み] Spark Implicit $ for DataFrame(データフレーム用暗黙の$)。
-
[解決済み] データフレームを複数回グループ化する
-
[解決済み] ScalaのDSLって何?[クローズド]
-
[解決済み】Scalaにおける中括弧と括弧の正式な違い、また、どのような場合に使用すべきなのか?
-
[解決済み】case objectとobjectの違いについて
-
[解決済み] 縮小、折りたたみ、スキャン(左/右)?
-
[解決済み] 型の論理和(ユニオン型)はどのように定義するのですか?
-
[解決済み] 2つのマップをマージし、同じキーの値を合計するための最良の方法?