apache spark tutorial Come funziona la funzione di aggregazione Spark: aggregateByKey?




spark vs hadoop (2)

aggregateByKey() è quasi identico a reduceByKey() (entrambi richiamano combineByKey() dietro le quinte), tranne per il valore iniziale di aggregateByKey() . La maggior parte delle persone ha familiarità con reduceByKey() , quindi la userò nella spiegazione.

Il motivo per cui reduceByKey() è molto migliore è perché utilizza una funzione MapReduce chiamata combinatore. Qualsiasi funzione come + o * può essere utilizzata in questo modo perché l'ordine degli elementi su cui è chiamato non ha importanza. Ciò consente a Spark di iniziare a "ridurre" i valori con la stessa chiave anche se non sono ancora tutti nella stessa partizione.

Il lato opposto del groupByKey() ti offre una maggiore versatilità poiché scrivi una funzione che accetta un Iterable, il che significa che puoi persino estrarre tutti gli elementi in un array. Tuttavia è inefficiente perché per farlo funzionare la coppia completa di coppie (K,V,) deve essere in una partizione.

Il passaggio che sposta i dati su un'operazione di tipo riduttivo viene generalmente chiamato shuffle , al livello più semplice i dati vengono partizionati su ciascun nodo (spesso con un partizionatore hash) e quindi ordinati su ciascun nodo.

Diciamo che ho un sistema di distribuzione su 3 nodi e i miei dati sono distribuiti tra quei nodi. ad esempio, ho un file test.csv che esiste su tutti e 3 i nodi e contiene 2 colonne di:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

Quindi uso SparkContext.textFile per leggere il file come rdd e così via. Per quanto ho capito, ogni nodo spark worker leggerà una parte dal file. Quindi adesso diciamo che ogni nodo memorizzerà:

  • nodo 1: riga 1 ~ 4
  • nodo 2: riga 5 ~ 8
  • nodo 3: riga 9 ~ 12

La mia domanda è che diciamo che voglio fare il calcolo su quei dati, e c'è un passo che ho bisogno di raggruppare la chiave insieme, quindi la coppia di valori chiave sarebbe [k1 [{k1 c1} {k1 c2} {k1 c3}]].. e così via.

Esiste una funzione chiamata groupByKey() che è molto costosa da utilizzare e si consiglia di utilizzare aggregateByKey() . Quindi mi chiedo come funziona groupByKey() e aggregateByKey() sotto il cofano? Qualcuno può usare l'esempio che ho fornito sopra per spiegare per favore? Dopo aver mischiato dove risiedono le righe su ciascun nodo?


aggregateByKey () è abbastanza diverso da reduceByKey. Quello che succede è che ridurreByKey è una specie di caso particolare di aggregateByKey.

aggregateByKey () combina i valori per una particolare chiave e il risultato di tale combinazione può essere qualsiasi oggetto specificato. È necessario specificare in che modo i valori vengono combinati ("aggiunti") all'interno di una partizione (che viene eseguita nello stesso nodo) e come si combina il risultato di partizioni diverse (che possono trovarsi in nodi diversi). reduceByKey è un caso particolare, nel senso che il risultato della combinazione (ad esempio una somma) è dello stesso tipo dei valori e che l'operazione combinata da partizioni diverse è uguale all'operazione quando si combinano valori all'interno di una partizione.

Un esempio: immagina di avere una lista di coppie. Lo si parallelizza:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

Ora vuoi "combinarli" per chiave producendo una somma. In questo caso ridurreByKey e aggregateByKey sono gli stessi:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

Ora immagina che vuoi che l'aggregazione sia un Set di valori, che è un tipo diverso dai valori, che sono numeri interi (la somma di interi è anche un numero intero):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))




distributed-computing