scala - Funken Sie UDAF mit ArrayType als bufferSchema-Leistungsprobleme



performance apache-spark (1)

Ich arbeite an einer UDAF, die eine Reihe von Elementen zurückgibt.

Die Eingabe für jede Aktualisierung ist ein Tupel aus Index und Wert.

Der UDAF summiert alle Werte unter demselben Index.

Beispiel:

Für die Eingabe (Index, Wert): (2,1), (3,1), (2,3)

sollte zurückkehren (0,0,4,1, ..., 0)

Die Logik funktioniert einwandfrei, aber ich habe ein Problem mit der Aktualisierungsmethode . Meine Implementierung aktualisiert nur 1 Zelle für jede Zeile , aber die letzte Zuweisung in dieser Methode kopiert tatsächlich das gesamte Array - was redundant und extrem zeitaufwendig ist.

Diese Zuweisung allein ist für 98% meiner Abfrageausführungszeit verantwortlich .

Meine Frage ist, wie kann ich diese Zeit verkürzen? Ist es möglich, 1 Wert im Pufferarray zuzuweisen, ohne den gesamten Puffer ersetzen zu müssen?

PS: Ich arbeite mit Spark 1.6 und kann es in Kürze nicht mehr aktualisieren. Halten Sie sich daher bitte an die Lösung, die mit dieser Version funktionieren würde.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

TL; DR Verwenden Sie entweder kein UDAF oder primitive Typen anstelle von ArrayType .

Ohne UserDefinedFunction

Bei beiden Lösungen sollte das teure Jonglieren zwischen interner und externer Repräsentation entfallen.

Verwendung von Standardaggregaten und pivot

Dies verwendet Standard-SQL-Aggregationen. Bei der internen Optimierung kann es teuer werden, wenn die Anzahl der Schlüssel und die Größe des Arrays zunehmen.

Vorgegebene Eingabe:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

Du kannst:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

Verwenden der RDD-API mit combineByKey / aggregateByKey .

Einfache alte byKey Aggregation mit veränderlichem Puffer. Kein Schnickschnack, sollte aber mit einer großen Bandbreite von Eingängen einigermaßen funktionieren. Wenn Sie vermuten, dass Eingaben spärlich sind, können Sie eine effizientere Zwischendarstellung in Betracht ziehen, z. B. eine veränderbare Map .

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

Verwenden von UserDefinedFunction mit primitiven Typen

Soweit ich die Interna verstehe, ist der Leistungsengpass ArrayConverter.toCatalystImpl .

Es sieht so aus, als würde es für jeden Aufruf von MutableAggregationBuffer.update und GenericArrayData Row nach jeder Row neue GenericArrayData zu.

Wenn wir bufferSchema neu bufferSchema als:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}

Sowohl update als auch merge können als einfache Ersetzungen von Grundwerten im Puffer ausgedrückt werden. Die Anrufkette wird ziemlich lang bleiben, aber es werden keine Kopien / Konvertierungen und verrückten Zuweisungen benötigt. Wenn Sie keine Überprüfungen durchführen, benötigen Sie etwas Ähnliches

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))

und

for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}

beziehungsweise.

Schließlich sollte evaluate Row nehmen und konvertieren, um Seq auszugeben:

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

Bitte beachten Sie, dass in dieser Implementierung ein möglicher Engpass merge . Obwohl dies keine neuen Leistungsprobleme mit sich bringen sollte, ist bei M Buckets jeder Aufruf zum merge O (M) .

Mit K eindeutigen Schlüsseln und P Partitionen wird es im schlimmsten Fall M * K- mal aufgerufen, wenn jeder Schlüssel auf jeder Partition mindestens einmal vorkommt. Dies erhöht effektiv die Komplizenschaft der merge mit O (M · N · K) .

Im Allgemeinen kann man nicht viel dagegen tun. Wenn Sie jedoch bestimmte Annahmen über die Datenverteilung treffen (Daten sind spärlich, Schlüsselverteilung ist gleichmäßig), können Sie die Dinge ein wenig verkürzen und zuerst mischen:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

Wenn die Annahmen erfüllt sind, sollte es:

  • Reduzieren Sie die Shuffle-Größe intuitiv, indem Sie spärliche Paare anstelle dichter Array-ähnlicher Rows mischen.
  • Aggregieren Sie Daten nur mit Aktualisierungen (jedes O (1) ), die sich möglicherweise nur als Teilmenge der Indizes berühren.

Wenn jedoch eine oder beide Annahmen nicht erfüllt sind, können Sie davon ausgehen, dass die Zufallsgröße zunimmt, während die Anzahl der Aktualisierungen gleich bleibt. Gleichzeitig kann ein Datenversatz die Situation noch verschlimmern als im update shuffle merge Szenario.

Verwenden von Aggregator mit "stark" typisiertem Dataset :

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}

welches wie unten gezeigt verwendet werden könnte

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+




user-defined-functions