scala - كيفية استخدام COGROUP لمجموعات البيانات الكبيرة




apache-spark apache-spark-sql (2)

لديّ اثنان من rdd's وهما val tab_a: RDD[(String, String)] و val tab_b: RDD[(String, String)] أنا أستخدم cogroup لمجموعات البيانات مثل:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

أنا أستخدم قيم tab_c tab_c لوظيفة الخريطة ، وهي تعمل بشكل جيد لمجموعات البيانات الصغيرة ، لكن في حالة وجود مجموعات بيانات ضخمة ، فإنها Out Of Memory exception .

لقد حاولت تحويل القيمة النهائية إلى RDD ولكن لا حظ نفس الخطأ

val newcos = spark.sparkContext.parallelize(tab_c)

1. كيفية استخدام cogroup لمجموعات البيانات الكبيرة؟

2. يمكن أن نستمر في القيمة cogrouped؟

الشفرة

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

تم تحديث الكود:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

خطأ:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

شكرا لك


عندما تستخدم collect() فإنك تخبر الشرارة أساسًا بنقل جميع البيانات الناتجة إلى العقدة الرئيسية ، والتي يمكن أن تنتج عنق الزجاجة بسهولة. لم تعد تستخدم Spark في تلك المرحلة ، بل مجرد مجموعة بسيطة في جهاز واحد.

لتشغيل الحساب ، استخدم فقط شيئًا يتطلب البيانات في كل عقدة ، وهذا هو السبب في أن المسؤولين التنفيذيين يعيشون على قمة نظام الملفات الموزع. على سبيل المثال saveAsTextFile() .

فيما يلي بعض الأمثلة الأساسية.

تذكر أن الهدف بأكمله هنا (أي إذا كان لديك بيانات كبيرة) هو نقل الكود إلى بياناتك والحساب هناك ، وليس إحضار جميع البيانات إلى الحساب.


TL ؛ DR لا collect .

لتشغيل هذا الرمز بأمان ، وبدون افتراضات إضافية (في المتوسط ​​قد تكون متطلبات العقد المنفصلة أصغر بكثير) ، ستحتاج كل عقدة (برنامج تشغيل وكل منفذ تنفيذي) إلى ذاكرة تتجاوز متطلبات الذاكرة الإجمالية بشكل كبير لجميع البيانات.

إذا كنت تريد تشغيله خارج Spark ، فستحتاج إلى عقدة واحدة فقط. لذلك لا توفر Spark أي فوائد هنا.

ومع ذلك ، إذا تخطيت collect.toArray وقمت ببعض الافتراضات حول توزيع البيانات ، فقد تقوم بتشغيله على ما يرام.





apache-spark-sql