scala - Funken Sie UDAF mit ArrayType als bufferSchema-Leistungsprobleme
performance apache-spark (1)
Ich arbeite an einer UDAF, die eine Reihe von Elementen zurückgibt.
Die Eingabe für jede Aktualisierung ist ein Tupel aus Index und Wert.
Der UDAF summiert alle Werte unter demselben Index.
Beispiel:
Für die Eingabe (Index, Wert): (2,1), (3,1), (2,3)
sollte zurückkehren (0,0,4,1, ..., 0)
Die Logik funktioniert einwandfrei, aber ich habe ein Problem mit der Aktualisierungsmethode . Meine Implementierung aktualisiert nur 1 Zelle für jede Zeile , aber die letzte Zuweisung in dieser Methode kopiert tatsächlich das gesamte Array - was redundant und extrem zeitaufwendig ist.
Diese Zuweisung allein ist für 98% meiner Abfrageausführungszeit verantwortlich .
Meine Frage ist, wie kann ich diese Zeit verkürzen? Ist es möglich, 1 Wert im Pufferarray zuzuweisen, ohne den gesamten Puffer ersetzen zu müssen?
PS: Ich arbeite mit Spark 1.6 und kann es in Kürze nicht mehr aktualisieren. Halten Sie sich daher bitte an die Lösung, die mit dieser Version funktionieren würde.
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
TL; DR
Verwenden Sie entweder kein UDAF oder primitive Typen anstelle von
ArrayType
.
Ohne
UserDefinedFunction
Bei beiden Lösungen sollte das teure Jonglieren zwischen interner und externer Repräsentation entfallen.
Verwendung von Standardaggregaten und
pivot
Dies verwendet Standard-SQL-Aggregationen. Bei der internen Optimierung kann es teuer werden, wenn die Anzahl der Schlüssel und die Größe des Arrays zunehmen.
Vorgegebene Eingabe:
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
Du kannst:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}
val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)
df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Verwenden der RDD-API mit
combineByKey
/
aggregateByKey
.
Einfache alte
byKey
Aggregation mit veränderlichem Puffer.
Kein Schnickschnack, sollte aber mit einer großen Bandbreite von Eingängen einigermaßen funktionieren.
Wenn Sie vermuten, dass Eingaben spärlich sind, können Sie eine effizientere Zwischendarstellung in Betracht ziehen, z. B. eine veränderbare
Map
.
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Verwenden von
UserDefinedFunction
mit primitiven Typen
Soweit ich die Interna verstehe, ist der Leistungsengpass
ArrayConverter.toCatalystImpl
.
Es sieht so aus, als würde es für jeden Aufruf von
MutableAggregationBuffer.update
und
GenericArrayData
Row
nach jeder
Row
neue
GenericArrayData
zu.
Wenn wir
bufferSchema
neu
bufferSchema
als:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
Sowohl
update
als auch
merge
können als einfache Ersetzungen von Grundwerten im Puffer ausgedrückt werden.
Die Anrufkette wird ziemlich lang bleiben, aber
es werden keine Kopien / Konvertierungen
und verrückten Zuweisungen benötigt.
Wenn Sie keine Überprüfungen durchführen, benötigen Sie etwas Ähnliches
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
und
for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
beziehungsweise.
Schließlich sollte
evaluate
Row
nehmen und konvertieren, um
Seq
auszugeben:
for (i <- 0 to nBuckets) yield buffer.getLong(i)
Bitte beachten Sie, dass in dieser Implementierung ein möglicher Engpass
merge
.
Obwohl dies keine neuen Leistungsprobleme mit sich bringen sollte, ist bei
M
Buckets jeder Aufruf zum
merge
O (M)
.
Mit
K
eindeutigen Schlüsseln und
P
Partitionen wird es im schlimmsten Fall
M * K-
mal aufgerufen, wenn jeder Schlüssel auf jeder Partition mindestens einmal vorkommt.
Dies erhöht effektiv die Komplizenschaft der
merge
mit
O (M · N · K)
.
Im Allgemeinen kann man nicht viel dagegen tun. Wenn Sie jedoch bestimmte Annahmen über die Datenverteilung treffen (Daten sind spärlich, Schlüsselverteilung ist gleichmäßig), können Sie die Dinge ein wenig verkürzen und zuerst mischen:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))
Wenn die Annahmen erfüllt sind, sollte es:
-
Reduzieren Sie die Shuffle-Größe intuitiv, indem Sie spärliche Paare anstelle dichter Array-ähnlicher
Rows
mischen. - Aggregieren Sie Daten nur mit Aktualisierungen (jedes O (1) ), die sich möglicherweise nur als Teilmenge der Indizes berühren.
Wenn jedoch eine oder beide Annahmen nicht erfüllt sind, können Sie davon ausgehen, dass die Zufallsgröße zunimmt, während die Anzahl der Aktualisierungen gleich bleibt.
Gleichzeitig kann ein Datenversatz die Situation noch verschlimmern als im
update
shuffle
merge
Szenario.
Verwenden von
Aggregator
mit "stark" typisiertem
Dataset
:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
with Serializable {
def zero = Array.fill(bucketSize)(0L)
def reduce(acc: Array[Long], x: I) = {
val (i, v) = f(x)
acc(i) += v
acc
}
def merge(acc1: Array[Long], acc2: Array[Long]) = {
for {
i <- 0 until bucketSize
} acc1(i) += acc2(i)
acc1
}
def finish(acc: Array[Long]) = acc.toSeq
def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
welches wie unten gezeigt verwendet werden könnte
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
ds
.groupByKey(_._1)
.agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
.show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2) |
+-----+-------------------------------+
|1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+