scala apache - ¿Cómo almacenar objetos personalizados en Dataset?




spark tutorial (6)

De acuerdo con Introducing Spark Datasets :

A medida que esperamos Spark 2.0, planeamos algunas mejoras interesantes para los conjuntos de datos, específicamente: ... Encoders personalizados: mientras que actualmente autogeneramos codificadores para una amplia variedad de tipos, nos gustaría abrir una API para objetos personalizados.

e intenta almacenar el tipo personalizado en un Dataset conduce a un error siguiente como:

No se puede encontrar el codificador para el tipo almacenado en un Dataset. Los tipos primitivos (Int, String, etc.) y los tipos de productos (clases de casos) son compatibles con la importación de sqlContext.implicits._ Se agregará soporte para serializar otros tipos en versiones futuras.

o:

Java.lang.UnsupportedOperationException: no se encontró ningún codificador para ....

¿Hay alguna solución provisional?

Tenga en cuenta que esta pregunta solo existe como un punto de entrada para una respuesta Wiki comunitario. Siéntase libre de actualizar / mejorar tanto la pregunta como la respuesta.


Answers

Actualizar

Esta respuesta sigue siendo válida e informativa, aunque las cosas ahora son mejores desde 2.2 / 2.3, que agrega soporte de codificador incorporado para Set , Seq , Map , Date , Timestamp y BigDecimal . Si se limita a crear tipos con solo clases de casos y los tipos habituales de Scala, debería estar bien solo con lo implícito en SQLImplicits .

Lamentablemente, prácticamente no se ha agregado nada para ayudar con esto. La búsqueda de @since 2.0.0 en Encoders.scala o SQLImplicits.scala encuentra que las cosas principalmente tienen que ver con los tipos primitivos (y algunos ajustes de las clases de casos). Entonces, lo primero que hay que decir es que actualmente no existe una buena compatibilidad con los codificadores de clases personalizados . Con eso fuera del camino, lo que sigue son algunos trucos que hacen un trabajo tan bueno como podemos esperar, dado lo que tenemos actualmente a nuestra disposición. Como un descargo de responsabilidad inicial: esto no funcionará perfectamente y haré todo lo posible para dejar claras y claras todas las limitaciones.

Cuál es el problema exactamente

Cuando desee crear un conjunto de datos, Spark "requiere un codificador (para convertir un objeto JVM de tipo T a y desde la representación interna de Spark SQL) que generalmente se crea automáticamente a través de implicits de una SparkSession , o puede crearse explícitamente al invocar estática métodos en Encoders "(tomado de los documentos en createDataset ). Un codificador tomará la forma Encoder[T] donde T es el tipo que está codificando. La primera sugerencia es agregar import spark.implicits._ (que le da these codificadores implícitos) y la segunda sugerencia es pasar explícitamente en el codificador implícito utilizando this conjunto de funciones relacionadas con el codificador.

No hay un codificador disponible para las clases regulares, por lo

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

le dará el siguiente error de tiempo de compilación implícito relacionado:

No se puede encontrar el codificador para el tipo almacenado en un Dataset. Los tipos primitivos (Int, String, etc.) y los tipos de productos (clases de casos) son compatibles con la importación de sqlContext.implicits._ Se agregará soporte para serializar otros tipos en versiones futuras.

Sin embargo, si ajusta el tipo que acaba de utilizar para obtener el error anterior en alguna clase que extienda el Product , el error confusamente se retrasa al tiempo de ejecución, por lo que

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Compila muy bien, pero falla en tiempo de ejecución con

java.lang.UnsupportedOperationException: No se ha encontrado ningún codificador para MyObj

La razón de esto es que los codificadores que Spark crea con los implicits en realidad solo se fabrican en tiempo de ejecución (a través de scala relfection). En este caso, todas las comprobaciones de Spark en tiempo de compilación es que la clase más externa extiende Product (lo que hacen todas las clases de caso), y solo se da cuenta en tiempo de ejecución que aún no sabe qué hacer con MyObj (el mismo problema ocurre si lo intento) para hacer un Dataset[(Int,MyObj)] - Spark espera hasta el tiempo de ejecución para barf en MyObj ). Estos son problemas centrales que necesitan urgentemente ser reparados:

  • algunas clases que extienden la compilación de Product pesar de que siempre se cuelgan en el tiempo de ejecución y
  • no hay forma de pasar en codificadores personalizados para tipos anidados (no tengo forma de alimentar Spark a un codificador solo para MyObj manera que luego sepa cómo codificar Wrap[MyObj] o (Int,MyObj) ).

Solo usa kryo

La solución que todos sugieren es usar el codificador de kryo .

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Sin embargo, esto se vuelve bastante tedioso rápidamente. Especialmente si su código está manipulando todo tipo de conjuntos de datos, uniéndose, agrupando, etc. Terminan acumulando un montón de implícitas adicionales. Entonces, ¿por qué no simplemente hacer un implícito que hace esto todo automáticamente?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

Y ahora, parece que puedo hacer casi todo lo que quiero (el siguiente ejemplo no funcionará en la spark-shell donde spark.implicits._ se importa automáticamente)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

O casi El problema es que usar kryo lleva a Spark simplemente almacenando cada fila en el conjunto de datos como un objeto binario plano. Para map , filter , foreach es suficiente, pero para operaciones como join , Spark realmente necesita que estos se separen en columnas. Al inspeccionar el esquema para d2 o d3 , verá que hay solo una columna binaria:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Solución parcial para tuplas

Entonces, usando la magia de las implicidades en Scala (más en 6.26.3 Overloading Resolution ), puedo hacerme una serie de implicaciones que harán el mejor trabajo posible, al menos para las tuplas, y funcionarán bien con las implicaciones existentes:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Entonces, armado con estas implicaciones, puedo hacer que mi ejemplo anterior funcione, aunque con el cambio de nombre de algunas columnas

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Todavía no he descubierto cómo obtener los nombres de tupla esperados ( _1 , _2 , ...) de forma predeterminada sin cambiar el nombre; si alguien más quiere jugar con esto, this es donde se introduce el nombre "value" y this es donde generalmente se agregan los nombres de las tuplas. Sin embargo, el punto clave es que ahora tengo un bonito esquema estructurado:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Entonces, en resumen, esta solución alternativa:

  • nos permite obtener columnas separadas para tuplas (para que podamos unirnos a las tuplas nuevamente, ¡sí!)
  • de nuevo solo podemos confiar en las implícitas (por lo que no es necesario pasar kryo todo el lugar)
  • es casi completamente compatible con versiones anteriores de import spark.implicits._ (con algunos import spark.implicits._ nombre implicados)
  • no nos permite unirnos a las columnas binarias serializadas kyro , y mucho menos en los campos que pueden tener
  • tiene el desagradable efecto secundario de cambiar el nombre de algunas de las columnas de la tupla a "valor" (si es necesario, puede deshacerlo convirtiendo .toDF , especificando nuevos nombres de columna y convirtiendo nuevamente a un conjunto de datos, y los nombres de esquema parecen conservarse a través de las uniones, donde más se necesitan).

Solución parcial para clases en general

Este es menos agradable y no tiene una buena solución. Sin embargo, ahora que tenemos la solución de tupla anterior, tengo la corazonada de que la solución de conversión implícita de otra respuesta será un poco menos dolorosa también, ya que puedes convertir tus clases más complejas en tuplas. Luego, después de crear el conjunto de datos, probablemente cambie el nombre de las columnas con el enfoque de marco de datos. Si todo va bien, esto es realmente una mejora ya que ahora puedo realizar uniones en los campos de mis clases. Si hubiera usado un serializador de kryo binario plano que no hubiera sido posible.

Aquí hay un ejemplo que hace un poco de todo: tengo una clase MyObj que tiene campos de tipos Int , java.util.UUID y Set[String] . El primero se cuida a sí mismo. El segundo, aunque podría serializar usando kryo , sería más útil si se almacena como una String (ya que los UUID generalmente son algo con lo que querré unirme). El tercero realmente solo pertenece a una columna binaria.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Ahora, puedo crear un conjunto de datos con un buen esquema usando esta maquinaria:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

Y el esquema me muestra las columnas con los nombres correctos y con las dos primeras cosas a las que me puedo unir.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

Los codificadores funcionan más o menos igual en Spark2.0 . Y Kryo sigue siendo la opción de serialization recomendada.

Puedes ver el siguiente ejemplo con spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Hasta ahora] no había appropriate encoders en el alcance actual, por lo que nuestras personas no estaban codificadas como valores binary . Pero eso cambiará una vez que proporcionemos algunos codificadores implicit usando la serialización de Kryo .

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

  1. Usando codificadores genéricos.

    Hay dos codificadores genéricos disponibles para ahora kryo y javaSerialization donde el último se describe explícitamente como:

    extremadamente ineficiente y solo debe usarse como último recurso.

    Asumiendo la siguiente clase

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    puede usar estos codificadores agregando un codificador implícito:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    que se pueden usar juntos de la siguiente manera:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    Almacena objetos como columna binary , por lo que cuando se convierte a DataFrame se obtiene el siguiente esquema:

    root
     |-- value: binary (nullable = true)
    

    También es posible codificar tuplas usando el codificador kryo para un campo específico:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Tenga en cuenta que no dependemos de codificadores implícitos aquí, pero pasamos el codificador explícitamente, por lo que probablemente no funcione con el método toDS .

  2. Usar conversiones implícitas:

    Proporcione conversiones implícitas entre representaciones que pueden codificarse y clases personalizadas, por ejemplo:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

Preguntas relacionadas:

  • Cómo crear un codificador para el constructor de tipo de Opción, por ejemplo, Opción [Int]?

Mis ejemplos estarán en Java, pero no me imagino que será difícil adaptarme a Scala.

He tenido bastante éxito al convertir RDD<Fruit> en Dataset<Fruit> utilizando spark.createDataset y Encoders.bean , siempre y cuando Fruit sea ​​un simple Java Bean .

Paso 1: crea el Java Bean simple.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Me apegaba a las clases con tipos primitivos y String como campos antes de que las personas de DataBricks reforzaran sus Encoders. Si tiene una clase con un objeto anidado, cree otro Java Bean simple con todos sus campos aplanados, para que pueda usar las transformaciones de RDD para asignar el tipo complejo al más simple. Claro que es un poco más de trabajo, pero me imagino que ayudará mucho en el rendimiento trabajando con un esquema plano.

Paso 2: Obtenga su Dataset del RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

¡Y voilá! Enjabona, enjuaga, repite.


En el caso de la clase Java Bean, esto puede ser útil

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Ahora puede simplemente leer el DataFrame como un DataFrame personalizado

dataFrame.as[MyClass]

Esto creará un codificador de clase personalizado y no uno binario.


Seguir:

Mi conclusión provisional es que las bibliotecas de Atlas instaladas por defecto en la instancia de Amazon EMR son simplemente lentas. O bien es una versión genérica que no se ha optimizado para el tipo de máquina específico, o es fundamentalmente más lenta que otras bibliotecas. Usando este script como guía, construí e instalé OpenBLAS para el tipo de máquina específico en el que ejecutaba los puntos de referencia (también encontré información útil here ). Una vez que se instaló OpenBLAS, mi punto de referencia de multiplicación de matriz de 3000x3000 se completó en 3.9s (en comparación con los 15.1 enumerados anteriormente cuando se usan las librerías Atlas por defecto). Esto es aún más lento que el mismo benchmark ejecutado en mi Mac (por un factor de x2), pero esta diferencia se encuentra en un rango que podría ser creíble debido al rendimiento h / w subyacente.

Aquí hay una lista completa de los comandos que utilicé para instalar OpenBLAS libs en el EMR de Amazon, instancia de Spark:

sudo yum install git
git clone https://github.com/xianyi/OpenBlas.git
cd OpenBlas/
make clean
make -j4
sudo mkdir /usr/lib64/OpenBLAS
sudo chmod o+w,g+w /usr/lib64/OpenBLAS/
make PREFIX=/usr/lib64/OpenBLAS install
sudo rm /etc/ld.so.conf.d/atlas-x86_64.conf 
sudo ldconfig
sudo ln -sf /usr/lib64/OpenBLAS/lib/libopenblas.so /usr/lib64/libblas.so
sudo ln -sf /usr/lib64/OpenBLAS/lib/libopenblas.so /usr/lib64/libblas.so.3
sudo ln -sf /usr/lib64/OpenBLAS/lib/libopenblas.so /usr/lib64/libblas.so.3.5
sudo ln -sf /usr/lib64/OpenBLAS/lib/libopenblas.so /usr/lib64/liblapack.so
sudo ln -sf /usr/lib64/OpenBLAS/lib/libopenblas.so /usr/lib64/liblapack.so.3
sudo ln -sf /usr/lib64/OpenBLAS/lib/libopenblas.so /usr/lib64/liblapack.so.3.5




scala apache-spark apache-spark-dataset