scala - كيفية استخدام COGROUP لمجموعات البيانات الكبيرة




apache-spark apache-spark-sql (2)

عندما تستخدم collect() فإنك تخبر الشرارة أساسًا بنقل جميع البيانات الناتجة إلى العقدة الرئيسية ، والتي يمكن أن تنتج عنق الزجاجة بسهولة. لم تعد تستخدم Spark في تلك المرحلة ، بل مجرد مجموعة بسيطة في جهاز واحد.

لتشغيل الحساب ، استخدم فقط شيئًا يتطلب البيانات في كل عقدة ، وهذا هو السبب في أن المسؤولين التنفيذيين يعيشون على قمة نظام الملفات الموزع. على سبيل المثال saveAsTextFile() .

فيما يلي بعض الأمثلة الأساسية.

تذكر أن الهدف بأكمله هنا (أي إذا كان لديك بيانات كبيرة) هو نقل الكود إلى بياناتك والحساب هناك ، وليس إحضار جميع البيانات إلى الحساب.

لديّ اثنان من rdd's وهما val tab_a: RDD[(String, String)] و val tab_b: RDD[(String, String)] أنا أستخدم cogroup لمجموعات البيانات مثل:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

أنا أستخدم قيم tab_c tab_c لوظيفة الخريطة ، وهي تعمل بشكل جيد لمجموعات البيانات الصغيرة ، لكن في حالة وجود مجموعات بيانات ضخمة ، فإنها Out Of Memory exception .

لقد حاولت تحويل القيمة النهائية إلى RDD ولكن لا حظ نفس الخطأ

val newcos = spark.sparkContext.parallelize(tab_c)

1. كيفية استخدام cogroup لمجموعات البيانات الكبيرة؟

2. يمكن أن نستمر في القيمة cogrouped؟

الشفرة

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

تم تحديث الكود:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

خطأ:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

شكرا لك


TL ؛ DR لا collect .

لتشغيل هذا الرمز بأمان ، وبدون افتراضات إضافية (في المتوسط ​​قد تكون متطلبات العقد المنفصلة أصغر بكثير) ، ستحتاج كل عقدة (برنامج تشغيل وكل منفذ تنفيذي) إلى ذاكرة تتجاوز متطلبات الذاكرة الإجمالية بشكل كبير لجميع البيانات.

إذا كنت تريد تشغيله خارج Spark ، فستحتاج إلى عقدة واحدة فقط. لذلك لا توفر Spark أي فوائد هنا.

ومع ذلك ، إذا تخطيت collect.toArray وقمت ببعض الافتراضات حول توزيع البيانات ، فقد تقوم بتشغيله على ما يرام.





apache-spark-sql