scala - tutorial - spark sql




¿Cómo almacenar objetos personalizados en el conjunto de datos? (6)

Actualizar

Esta respuesta sigue siendo válida e informativa, aunque las cosas ahora son mejores desde 2.2 / 2.3, que agrega soporte de codificador incorporado para Set , Seq , Map , Date , Timestamp y BigDecimal . Si se limita a crear tipos con solo clases de casos y los tipos Scala habituales, debería estar bien con lo implícito en SQLImplicits .

Desafortunadamente, prácticamente no se ha agregado nada para ayudar con esto. La búsqueda de @since 2.0.0 en Encoders.scala o SQLImplicits.scala encuentra cosas principalmente relacionadas con los tipos primitivos (y algunos ajustes de las clases de casos). Entonces, lo primero que hay que decir: actualmente no hay un soporte realmente bueno para codificadores de clase personalizados . Con eso fuera del camino, lo que sigue son algunos trucos que hacen un trabajo tan bueno como podemos esperar, dado lo que tenemos actualmente a nuestra disposición. Como descargo de responsabilidad por adelantado: esto no funcionará perfectamente y haré todo lo posible para que todas las limitaciones sean claras y directas.

Cuál es el problema exactamente

Cuando desee crear un conjunto de datos, Spark "requiere un codificador (para convertir un objeto JVM de tipo T SparkSession desde la representación interna de Spark SQL) que generalmente se crea automáticamente mediante SparkSession desde una SparkSession , o puede crearse explícitamente llamando a static métodos en Encoders "(tomado de los documentos en createDataset ). Un codificador tomará la forma Encoder[T] donde T es el tipo que está codificando. La primera sugerencia es agregar import spark.implicits._ (que le proporciona these codificadores implícitos) y la segunda sugerencia es pasar explícitamente el codificador implícito utilizando this conjunto de funciones relacionadas con el codificador.

No hay codificador disponible para clases regulares, por lo que

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

le dará el siguiente error de tiempo de compilación relacionado implícito:

No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, String, etc.) y los tipos de productos (clases de casos) son compatibles con la importación de sqlContext.implicits. En futuras versiones se agregará soporte para serializar otros tipos.

Sin embargo, si ajusta el tipo que acaba de usar para obtener el error anterior en alguna clase que extiende el Product , el error se retrasa confusamente en tiempo de ejecución, por lo que

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Compila bien, pero falla en tiempo de ejecución con

java.lang.UnsupportedOperationException: no se ha encontrado ningún codificador para MyObj

La razón de esto es que los codificadores que Spark crea con las implicidades en realidad solo se hacen en tiempo de ejecución (a través de la reescalación de escala). En este caso, todas las comprobaciones de Spark en el momento de la compilación es que la clase más externa extiende el Product (que hacen todas las clases de casos), y solo se da cuenta en el tiempo de ejecución de que todavía no sabe qué hacer con MyObj (el mismo problema ocurre si lo intento hacer un Dataset[(Int,MyObj)] - Spark espera hasta el tiempo de ejecución para vomitar en MyObj ). Estos son problemas centrales que necesitan urgentemente ser solucionados:

  • algunas clases que amplían la compilación del Product pesar de fallar siempre en tiempo de ejecución y
  • no hay forma de pasar codificadores personalizados para tipos anidados (no tengo forma de alimentar a Spark con un codificador solo para MyObj modo que sepa codificar Wrap[MyObj] o (Int,MyObj) ).

Solo usa kryo

La solución que todos sugieren es usar el codificador kryo .

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Sin embargo, esto se vuelve bastante tedioso rápidamente. Especialmente si su código está manipulando todo tipo de conjuntos de datos, uniéndose, agrupando, etc. Usted termina acumulando un montón de problemas adicionales. Entonces, ¿por qué no hacer un implícito que hace todo esto automáticamente?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

Y ahora, parece que puedo hacer casi cualquier cosa que quiera (el ejemplo a continuación no funcionará en el spark-shell donde spark.implicits._ se importa automáticamente)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

O casi. El problema es que el uso de kryo lleva a Spark a almacenar cada fila del conjunto de datos como un objeto binario plano. Para map , filter , foreach es suficiente, pero para operaciones como join , Spark realmente necesita que se separen en columnas. Al inspeccionar el esquema para d2 o d3 , verá que solo hay una columna binaria:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Solución parcial para tuplas.

Entonces, usando la magia de las implicidades en Scala (más en 6.26.3 Resolución de sobrecarga ), puedo hacer una serie de implicidades que harán el mejor trabajo posible, al menos para las tuplas, y funcionarán bien con las implicidades existentes:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Luego, armado con estas dificultades, puedo hacer que mi ejemplo anterior funcione, aunque con un cambio de nombre de columna

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Todavía no he descubierto cómo obtener los nombres de tupla esperados ( _1 , _2 , ...) de forma predeterminada sin cambiarles el nombre; si alguien más quiere jugar con esto, this es donde se introduce el nombre "value" y this es donde generalmente se agregan los nombres de tupla. Sin embargo, el punto clave es que ahora tengo un buen esquema estructurado:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Entonces, en resumen, esta solución alternativa:

  • nos permite obtener columnas separadas para las tuplas (para que podamos unirnos nuevamente en las tuplas, ¡sí!)
  • nuevamente podemos confiar en las implicidades (así que no hay necesidad de pasar kryo todos lados)
  • es casi totalmente retrocompatible con import spark.implicits._ (con algunos import spark.implicits._ nombre)
  • no nos permite unirnos en las columnas binarias serializadas kyro , y mucho menos en los campos que puedan tener
  • tiene el efecto secundario desagradable de renombrar algunas de las columnas de tupla a "valor" (si es necesario, esto se puede deshacer convirtiendo .toDF , especificando nuevos nombres de columna y volviendo a convertirlos en un conjunto de datos, y los nombres de esquema parecen conservarse a través de uniones, donde más se necesitan).

Solución parcial para clases en general.

Este es menos agradable y no tiene una buena solución. Sin embargo, ahora que tenemos la solución de tupla anterior, tengo el presentimiento de que la solución de conversión implícita de otra respuesta también será un poco menos dolorosa ya que puede convertir sus clases más complejas en tuplas. Luego, después de crear el conjunto de datos, probablemente cambiaría el nombre de las columnas utilizando el enfoque de marco de datos. Si todo va bien, esto es realmente una mejora ya que ahora puedo realizar uniones en los campos de mis clases. Si hubiera utilizado un serializador de kryo binario plano, eso no hubiera sido posible.

Aquí hay un ejemplo que hace un poco de todo: tengo una clase MyObj que tiene campos de tipos Int , java.util.UUID y Set[String] . El primero se cuida solo. El segundo, aunque podría serializar usando kryo , sería más útil si se almacenara como una String (ya que los UUID generalmente son algo contra lo que querré unirme). El tercero realmente solo pertenece a una columna binaria.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Ahora, puedo crear un conjunto de datos con un buen esquema usando esta maquinaria:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

Y el esquema me muestra columnas I con los nombres correctos y con las dos primeras cosas con las que me puedo unir.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

De acuerdo con la introducción de conjuntos de datos de Spark :

A medida que esperamos Spark 2.0, planeamos algunas mejoras interesantes para los conjuntos de datos, específicamente: ... Codificadores personalizados: si bien actualmente generamos codificadores de forma automática para una amplia variedad de tipos, nos gustaría abrir una API para objetos personalizados.

e intenta almacenar el tipo personalizado en un Dataset conduce al siguiente error como:

No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, String, etc.) y los tipos de productos (clases de casos) son compatibles con la importación de sqlContext.implicits. En futuras versiones se agregará soporte para serializar otros tipos.

o:

Java.lang.UnsupportedOperationException: no se ha encontrado ningún codificador para ...

¿Hay alguna solución existente?

Tenga en cuenta que esta pregunta existe solo como un punto de entrada para una respuesta Wiki de la comunidad. Siéntase libre de actualizar / mejorar tanto la pregunta como la respuesta.


  1. Usando codificadores genéricos.

    Hay dos codificadores genéricos disponibles para ahora kryo y javaSerialization donde el último se describe explícitamente como:

    extremadamente ineficiente y solo debe usarse como último recurso.

    Asumiendo la siguiente clase

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }

    puede usar estos codificadores agregando un codificador implícito:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }

    que se pueden usar juntos de la siguiente manera:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }

    Almacena objetos como columna binary , por lo que cuando se convierte en DataFrame obtiene el siguiente esquema:

    root
     |-- value: binary (nullable = true)

    También es posible codificar tuplas usando el codificador kryo para un campo específico:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]

    Tenga en cuenta que no dependemos de codificadores implícitos aquí, sino que pasamos el codificador explícitamente, por lo que es muy probable que esto no funcione con el método toDS .

  2. Usando conversiones implícitas:

    Proporcione conversiones implícitas entre la representación que se puede codificar y la clase personalizada, por ejemplo:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }

Preguntas relacionadas:

  • ¿Cómo crear un codificador para el constructor de tipo Opción, por ejemplo, Opción [Int]?

En el caso de la clase Java Bean, esto puede ser útil

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Ahora puede simplemente leer el DataFrame como DataFrame personalizado

dataFrame.as[MyClass]

Esto creará un codificador de clase personalizado y no uno binario.


Los codificadores funcionan más o menos igual en Spark2.0 . Y Kryo sigue siendo la opción de serialization recomendada.

Puedes ver el siguiente ejemplo con spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Hasta ahora] no había appropriate encoders en el alcance actual, por lo que nuestras personas no estaban codificadas como valores binary . Pero eso cambiará una vez que proporcionemos algunos codificadores implicit usando la serialización de Kryo .

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

Puede usar UDTRegistration y luego Case Classes, Tuples, etc ... ¡todo funciona correctamente con su Tipo definido por el usuario!

Digamos que quieres usar una enumeración personalizada:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Regístralo así:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Entonces úsalo!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Digamos que quieres usar un registro polimórfico:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... y lo usamos así:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Puede escribir un UDT personalizado que codifique todo en bytes (estoy usando la serialización de Java aquí, pero probablemente sea mejor instrumentar el contexto Kryo de Spark).

Primero defina la clase UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Entonces regístralo:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

¡Entonces puedes usarlo!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Mis ejemplos estarán en Java, pero no creo que sea difícil adaptarse a Scala.

He tenido bastante éxito al convertir RDD<Fruit> a Dataset<Fruit> usando spark.createDataset y Encoders.bean siempre que Fruit sea ​​un Java Bean simple.

Paso 1: Crea el Java Bean simple.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Me apegaría a clases con tipos primitivos y String como campos antes de que la gente de DataBricks refuerce sus codificadores. Si tiene una clase con objeto anidado, cree otro Java Bean simple con todos sus campos aplanados, para que pueda usar las transformaciones RDD para asignar el tipo complejo al más simple. Claro que es un poco de trabajo extra, pero imagino que ayudará mucho en el rendimiento al trabajar con un esquema plano.

Paso 2: Obtenga su conjunto de datos del RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

¡Y voilá! Enjabonar, enjuagar, repetir.





apache-spark-encoders