scala - Verwendung von COGROUP für große Datensätze




apache-spark apache-spark-sql (2)

Wenn Sie collect() , weisen Sie spark grundsätzlich an, alle resultierenden Daten zurück zum Masterknoten zu verschieben, was leicht zu einem Engpass führen kann. Zu diesem Zeitpunkt verwenden Sie Spark nicht mehr, sondern nur noch ein einfaches Array auf einem einzelnen Computer.

Um die Berechnung auszulösen, verwenden Sie einfach etwas, das die Daten an jedem Knoten benötigt. Deshalb leben die Ausführenden auf einem verteilten Dateisystem. Zum Beispiel saveAsTextFile() .

Hier sind einige grundlegende Beispiele.

Denken Sie daran, dass das gesamte Ziel hier (d. H., Wenn Sie über große Datenmengen verfügen) darin besteht, den Code in Ihre Daten zu verschieben und dort zu berechnen, und nicht alle Daten zur Berechnung zu bringen.

Ich habe zwei rdd's nämlich val tab_a: RDD[(String, String)] und val tab_b: RDD[(String, String)] Ich benutze cogroup für diese Datensätze wie:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

Ich verwende tab_c tab_c-Werte für die Kartenfunktion und es funktioniert gut für kleine Datasets, aber im Falle von großen Datasets wird eine Out Of Memory exception tab_c unzureichendem tab_c ausgelöst.

Ich habe versucht, den Endwert in RDD umzuwandeln, aber kein Glück, den gleichen Fehler

val newcos = spark.sparkContext.parallelize(tab_c)

1.Wie verwende ich Cogroup für große Datenmengen?

2.Können wir den gruppierten Wert beibehalten?

Code

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

Code aktualisiert:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

ERROR:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Vielen Dank


TL; DR Nicht collect .

Um diesen Code ohne zusätzliche Annahmen sicher auszuführen (die durchschnittlichen Anforderungen für Worker-Knoten sind möglicherweise erheblich geringer), würde jeder Knoten (Treiber und jeder Executor) Speicher benötigen, der den Gesamtspeicherbedarf für alle Daten erheblich übersteigt.

Wenn Sie es außerhalb von Spark ausführen, benötigen Sie nur einen Knoten. Daher bietet Spark hier keine Vorteile.

Wenn Sie jedoch collect.toArray überspringen und einige Annahmen zur Datenverteilung treffen, können Sie es möglicherweise collect.toArray ausführen.







apache-spark-sql