1. ホーム
  2. scala

[解決済み] データセットにカスタムオブジェクトを格納する方法は?

2022-04-26 09:23:34

質問

によると Sparkデータセットの紹介 :

Spark 2.0に向けて、特にDatasetsにエキサイティングな改良を予定しています。 ... カスタムエンコーダー - 現在、様々なタイプのエンコーダーを自動生成していますが、カスタムオブジェクトのためのAPIを開放したいと思います。

で、カスタムタイプを保存しようとすると Dataset のようなエラーが発生します。

データセットに格納されている型のエンコーダが見つかりません。プリミティブ型 (Int, String, etc) と プロダクト型 (case class) は sqlContext.implicits._ をインポートすることでサポートされます。 他の型のシリアライズのサポートは、将来のリリースで追加される予定です。

または

Java.lang.UnsupportedOperationException: No Encoder found for ....

既存の回避策はありますか?


この質問は、コミュニティWikiの回答への入り口としてのみ存在することに注意してください。質問と回答の両方を自由に更新/改善することができます。

解決方法を教えてください。

更新情報

2.2/2.3では、ビルトインエンコーダのサポートが追加され、より良くなりましたが、この回答はまだ有効であり、有益なものです。 Set , Seq , Map , Date , Timestamp および BigDecimal . ケースクラスと通常のScalaの型だけで型を作ることにこだわるなら,暗黙の了解である SQLImplicits .


残念ながら、事実上、何も追加されていません。検索すると @since 2.0.0 Encoders.scala または SQLImplicits.scala は、主にプリミティブ型に関係する事柄を発見しました(そして、ケースクラスにも若干の手を加えました)。ということで、まず最初に カスタムクラスのエンコーダは、今のところあまり良いサポートがありません。 . この後、現在私たちが自由に使えるものを使って、できる限り良い仕事をするためのいくつかのトリックを紹介します。ただし、完璧に動作するわけではありませんし、制限を明確にするために最大限の努力をします。

具体的に何が問題なのか

データセットを作成する場合、Sparkはエンコーダー(T型のJVMオブジェクトを内部のSpark SQL表現と相互変換する)を必要とします。 SparkSession の静的メソッドを呼び出して明示的に作成することもできます。 Encoders からの引用です。 のドキュメントを参照してください。 createDataset ). エンコーダは次のような形式になります。 Encoder[T] ここで T はエンコードしている型です。最初の提案は import spark.implicits._ (これは これら を使って暗黙のエンコーダを明示的に渡すことです。 これ エンコーダ関連関数のセットです。

通常のクラスには利用できるエンコーダがないので

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

を実行すると、以下のような暗黙の了解に関連するコンパイルタイムエラーが発生します。

データセットに格納されている型のエンコーダが見つかりません。プリミティブ型 (Int, String, etc) と プロダクト型 (case class) は sqlContext.implicits をインポートすることでサポートされます。

しかし、上記のエラーを発生させるために使用した型が何であれ、それを Product の場合、エラーは実行時に遅延されるため、混乱します。

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

コンパイルは正常に行われますが、実行時に次のように失敗します。

java.lang.UnsupportedOperationException: No Encoder found for MyObj(MyObjのエンコーダーが見つかりません。

この理由は、Sparkがインプリシットで作成するエンコーダは、実際には(scala relfectionを介して)実行時にのみ作成されるからです。この場合、Sparkがコンパイル時にチェックするのは、一番外側のクラスが Product (すべての case class がそうです) そして、実行時に初めて、まだ MyObj (を作ろうとすると、同じ問題が発生します)。 Dataset[(Int,MyObj)] - Sparkは、実行時に嘔吐するのを待ちます。 MyObj ). これらは、修正することが切実に求められている中心的な問題です。

  • を拡張する一部のクラスは Product は、実行時に必ずクラッシュするにもかかわらずコンパイルされ
  • ネストされた型のカスタムエンコーダーを渡す方法がない(Sparkに、単に MyObj をどのようにエンコードするのかがわかるように Wrap[MyObj] または (Int,MyObj) ).

を使うだけです。 kryo

誰もが提案する解決策は kryo エンコーダーを使用します。

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

しかし、これはかなり面倒なことです。特に、コードがあらゆる種類のデータセットを操作し、結合やグループ化などを行っている場合です。結局、余分なインプリシットの束を積み重ねることになります。そこで、これをすべて自動で行う暗黙の了解を作ればいいのではないでしょうか。

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

そして今、ほとんど何でもできるようになったようです(下記の例は spark-shell ここで spark.implicits._ は自動的にインポートされます)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

というかほとんど。問題は kryo は、Sparkがデータセットの各行をフラットなバイナリオブジェクトとして保存することにつながります。そのため map , filter , foreach のような操作では、それで十分です。 join Sparkは、これらをカラムに分離する必要があります。のスキーマを検査すると d2 または d3 バイナリ列が1つだけであることがわかると思います。

d2.printSchema
// root
//  |-- value: binary (nullable = true)

タプルの部分的な解決策

そこで、Scalaのimplicitの魔法を使い(詳しくは 6.26.3 オーバーロードの解決 ) 、少なくともタプルについては、できるだけ良い仕事をする一連の暗示を自分で作ることができ、既存の暗示とうまく機能することができます。

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

そして、これらのインプリシットで武装して、いくつかのカラムの名前を変更しながらも、上記の例を動作させることができます。

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

期待されるタプル名( _1 , _2 を、名前を変更せずにデフォルトで使用することができます。 これ は、名前 "value" が導入され これ は、通常タプル名を追加する場所です。しかし、重要なのは、これで立派な構造化スキーマができたということです。

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

つまり、要約すると、このワークアラウンドです。

  • タプルに対して別々のカラムを得ることができる(これでまたタプルに結合できる、やったー!)。
  • を渡す必要はなく、再びインプリシットに依存することができます。 kryo を使用します)。
  • とはほぼ完全に後方互換性があります。 import spark.implicits._ (若干の名前の変更が必要)
  • ない に参加させてください。 kyro を持つフィールドはもちろんのこと、シリアライズされたバイナリカラムにも適用されます。
  • は、タプルカラムの名前を "value" に変更するという不快な副作用があります (必要であれば、これを元に戻すには .toDF そして、スキーマ名は最も必要とされる結合によって保存されるようです)。

クラス全般の部分的な解決策

こちらはあまり気持ちの良いものではありませんし、良い解決策もありません。しかし、今、私たちは上記のタプルの解決策を持っているので、私は別の答えからの暗黙の変換の解決策は、あなたのより複雑なクラスをタプルに変換することができるので、あまりにも少し痛みを軽減する予感がする。そして、データセットを作成した後、データフレームのアプローチでカラムの名前を変更することになるでしょう。すべてがうまくいけば、これは 本当に クラスのフィールドで結合を実行できるようになったので、改善されました。もし、1つのフラットバイナリ kryo シリアライザーは、このようなことを可能にするものではありませんでした。

以下は、あらゆることを行う例です。 MyObj という型のフィールドを持ち Int , java.util.UUID および Set[String] . 最初のものは、それ自体で処理されます。2番目は kryo として格納した方が便利です。 String (以下 UUID は、通常、結合したいものです)。3番目はバイナリカラムに入れるべきものです。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

さて、この機械を使って、素敵なスキーマを持つデータセットを作ることができる。

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

そしてスキーマには、正しい名前のカラムと、最初の2つのカラムが表示されており、どちらも結合することができます。

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)