[解決済み] データセットにカスタムオブジェクトを格納する方法は?
質問
によると Sparkデータセットの紹介 :
Spark 2.0に向けて、特にDatasetsにエキサイティングな改良を予定しています。 ... カスタムエンコーダー - 現在、様々なタイプのエンコーダーを自動生成していますが、カスタムオブジェクトのためのAPIを開放したいと思います。
で、カスタムタイプを保存しようとすると
Dataset
のようなエラーが発生します。
データセットに格納されている型のエンコーダが見つかりません。プリミティブ型 (Int, String, etc) と プロダクト型 (case class) は sqlContext.implicits._ をインポートすることでサポートされます。 他の型のシリアライズのサポートは、将来のリリースで追加される予定です。
または
Java.lang.UnsupportedOperationException: No Encoder found for ....
既存の回避策はありますか?
この質問は、コミュニティWikiの回答への入り口としてのみ存在することに注意してください。質問と回答の両方を自由に更新/改善することができます。
解決方法を教えてください。
更新情報
2.2/2.3では、ビルトインエンコーダのサポートが追加され、より良くなりましたが、この回答はまだ有効であり、有益なものです。
Set
,
Seq
,
Map
,
Date
,
Timestamp
および
BigDecimal
. ケースクラスと通常のScalaの型だけで型を作ることにこだわるなら,暗黙の了解である
SQLImplicits
.
残念ながら、事実上、何も追加されていません。検索すると
@since 2.0.0
で
Encoders.scala
または
SQLImplicits.scala
は、主にプリミティブ型に関係する事柄を発見しました(そして、ケースクラスにも若干の手を加えました)。ということで、まず最初に
カスタムクラスのエンコーダは、今のところあまり良いサポートがありません。
. この後、現在私たちが自由に使えるものを使って、できる限り良い仕事をするためのいくつかのトリックを紹介します。ただし、完璧に動作するわけではありませんし、制限を明確にするために最大限の努力をします。
具体的に何が問題なのか
データセットを作成する場合、Sparkはエンコーダー(T型のJVMオブジェクトを内部のSpark SQL表現と相互変換する)を必要とします。
SparkSession
の静的メソッドを呼び出して明示的に作成することもできます。
Encoders
からの引用です。
のドキュメントを参照してください。
createDataset
). エンコーダは次のような形式になります。
Encoder[T]
ここで
T
はエンコードしている型です。最初の提案は
import spark.implicits._
(これは
これら
を使って暗黙のエンコーダを明示的に渡すことです。
これ
エンコーダ関連関数のセットです。
通常のクラスには利用できるエンコーダがないので
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
を実行すると、以下のような暗黙の了解に関連するコンパイルタイムエラーが発生します。
データセットに格納されている型のエンコーダが見つかりません。プリミティブ型 (Int, String, etc) と プロダクト型 (case class) は sqlContext.implicits をインポートすることでサポートされます。
しかし、上記のエラーを発生させるために使用した型が何であれ、それを
Product
の場合、エラーは実行時に遅延されるため、混乱します。
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
コンパイルは正常に行われますが、実行時に次のように失敗します。
java.lang.UnsupportedOperationException: No Encoder found for MyObj(MyObjのエンコーダーが見つかりません。
この理由は、Sparkがインプリシットで作成するエンコーダは、実際には(scala relfectionを介して)実行時にのみ作成されるからです。この場合、Sparkがコンパイル時にチェックするのは、一番外側のクラスが
Product
(すべての case class がそうです) そして、実行時に初めて、まだ
MyObj
(を作ろうとすると、同じ問題が発生します)。
Dataset[(Int,MyObj)]
- Sparkは、実行時に嘔吐するのを待ちます。
MyObj
). これらは、修正することが切実に求められている中心的な問題です。
-
を拡張する一部のクラスは
Product
は、実行時に必ずクラッシュするにもかかわらずコンパイルされ -
ネストされた型のカスタムエンコーダーを渡す方法がない(Sparkに、単に
MyObj
をどのようにエンコードするのかがわかるようにWrap[MyObj]
または(Int,MyObj)
).
を使うだけです。
kryo
誰もが提案する解決策は
kryo
エンコーダーを使用します。
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
しかし、これはかなり面倒なことです。特に、コードがあらゆる種類のデータセットを操作し、結合やグループ化などを行っている場合です。結局、余分なインプリシットの束を積み重ねることになります。そこで、これをすべて自動で行う暗黙の了解を作ればいいのではないでしょうか。
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
そして今、ほとんど何でもできるようになったようです(下記の例は
spark-shell
ここで
spark.implicits._
は自動的にインポートされます)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
というかほとんど。問題は
kryo
は、Sparkがデータセットの各行をフラットなバイナリオブジェクトとして保存することにつながります。そのため
map
,
filter
,
foreach
のような操作では、それで十分です。
join
Sparkは、これらをカラムに分離する必要があります。のスキーマを検査すると
d2
または
d3
バイナリ列が1つだけであることがわかると思います。
d2.printSchema
// root
// |-- value: binary (nullable = true)
タプルの部分的な解決策
そこで、Scalaのimplicitの魔法を使い(詳しくは 6.26.3 オーバーロードの解決 ) 、少なくともタプルについては、できるだけ良い仕事をする一連の暗示を自分で作ることができ、既存の暗示とうまく機能することができます。
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
そして、これらのインプリシットで武装して、いくつかのカラムの名前を変更しながらも、上記の例を動作させることができます。
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
期待されるタプル名(
_1
,
_2
を、名前を変更せずにデフォルトで使用することができます。
これ
は、名前
"value"
が導入され
これ
は、通常タプル名を追加する場所です。しかし、重要なのは、これで立派な構造化スキーマができたということです。
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
つまり、要約すると、このワークアラウンドです。
- タプルに対して別々のカラムを得ることができる(これでまたタプルに結合できる、やったー!)。
-
を渡す必要はなく、再びインプリシットに依存することができます。
kryo
を使用します)。 -
とはほぼ完全に後方互換性があります。
import spark.implicits._
(若干の名前の変更が必要) -
は
ない
に参加させてください。
kyro
を持つフィールドはもちろんのこと、シリアライズされたバイナリカラムにも適用されます。 -
は、タプルカラムの名前を "value" に変更するという不快な副作用があります (必要であれば、これを元に戻すには
.toDF
そして、スキーマ名は最も必要とされる結合によって保存されるようです)。
クラス全般の部分的な解決策
こちらはあまり気持ちの良いものではありませんし、良い解決策もありません。しかし、今、私たちは上記のタプルの解決策を持っているので、私は別の答えからの暗黙の変換の解決策は、あなたのより複雑なクラスをタプルに変換することができるので、あまりにも少し痛みを軽減する予感がする。そして、データセットを作成した後、データフレームのアプローチでカラムの名前を変更することになるでしょう。すべてがうまくいけば、これは
本当に
クラスのフィールドで結合を実行できるようになったので、改善されました。もし、1つのフラットバイナリ
kryo
シリアライザーは、このようなことを可能にするものではありませんでした。
以下は、あらゆることを行う例です。
MyObj
という型のフィールドを持ち
Int
,
java.util.UUID
および
Set[String]
. 最初のものは、それ自体で処理されます。2番目は
kryo
として格納した方が便利です。
String
(以下
UUID
は、通常、結合したいものです)。3番目はバイナリカラムに入れるべきものです。
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
さて、この機械を使って、素敵なスキーマを持つデータセットを作ることができる。
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
そしてスキーマには、正しい名前のカラムと、最初の2つのカラムが表示されており、どちらも結合することができます。
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
関連
-
[解決済み] Scala forallの例?
-
[解決済み] expr() での lit() の使用について
-
[解決済み] Scalaで定期的に関数を実行する
-
[解決済み] 理解する `andThen`
-
[解決済み】Scalaでケースクラスのインスタンスをクローンして、1つのフィールドだけを変更するにはどうすればよいですか?
-
[解決済み】Scalaの名前による呼び出しと値による呼び出し、明確化の必要性
-
[解決済み】Scala 2.8における<:<、<%<、=:=の意味と、それらのドキュメントはどこにあるのか?
-
[解決済み] 縮小、折りたたみ、スキャン(左/右)?
-
[解決済み] 型の論理和(ユニオン型)はどのように定義するのですか?
-
[解決済み] Scalaの==と.equalsの違いは何ですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Scalaでsynchronizedを使うには?
-
[解決済み] Scala: スライド(N,N) vs グループ化(N)
-
[解決済み] Scalaの「コンテキストバウンド」とは何ですか?
-
[解決済み] SparkSQL - パーケットファイルを直接読み込む
-
[解決済み] Scalaでファイル全体を読む?
-
[解決済み】Scala 2.8 breakOut
-
[解決済み】ScalaのfoldLeftとreduceLeftの違いについて
-
[解決済み】ScalaのJavaConvertersとJavaConversionsの違いは何ですか?
-
[解決済み] Build.scala、%および%%の記号の意味
-
[解決済み] Spark SQLでカラムの降順でソートするには?