apache-spark - شرح - spark برنامج




كيف تقرأ ملفات نصية متعددة في RDD واحد؟ (6)

أرغب في قراءة مجموعة من الملفات النصية من موقع hdfs وتنفيذ الخرائط عليها في التكرار باستخدام شرارة.

JavaRDD<String> records = ctx.textFile(args[1], 1); قادر على قراءة ملف واحد فقط في كل مرة.

أريد قراءة أكثر من ملف واحد ومعالجتها كأحد RDD. ماذا؟


استخدام union النحو التالي:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

ثم bigRdd هو RDD مع جميع الملفات.


جميع الإجابات صحيحة مع sc.textFile

كنت أتساءل فقط لماذا لا wholeTextFiles في هذه الحالة ...

sc.wholeTextFiles(yourfileListFromFolder.mkString(","))
  .flatMap{case (path, text) 
...

أحد القيود هو أنه يتعين علينا تحميل الملفات الصغيرة وإلا فإن الأداء سيكون سيئًا وقد يؤدي إلى OOM.

ملحوظة :

  • يجب أن تتلاءم fullfile في الذاكرة
  • جيد لتنسيقات الملفات التي لا يمكن تقسيمها حسب السطر ... مثل ملفات XML

مزيد من الرجوع إلى visit


هناك حل نظيف وواضح للأمام. استخدم الأسلوب wholeTextFiles (). سيأخذ هذا دليلاً ويشكل زوجًا ذا قيمة أساسية. سوف يكون RDD المرتجع زوج RDD. أوجد أسفل الوصف من مستندات Spark :

يتيح لك SparkContext.wholeTextFiles قراءة دليل يحتوي على عدة ملفات نصية صغيرة ، وإرجاع كل واحد منها كأزواج (اسم الملف ، والمحتوى). هذا على النقيض من textFile ، الذي سيعود سجل واحد لكل سطر في كل ملف


يمكنك استخدام - JavaRDD records = sc.wholeTextFiles ("مسار دليلك") هنا سوف تحصل على مسار ملفك ومحتوى هذا الملف. بحيث يمكنك تنفيذ أي إجراء لملف بأكمله في الوقت الذي يحفظ مقدار الحمل


يمكنك استخدام هذا

أولاً يمكنك الحصول على Buffer / List of S3 Paths:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

الآن تمرير كائن قائمة هذا إلى قطعة التعليمات البرمجية التالية ، ملاحظة: sc هو كائن SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

الآن حصلت على RDD النهائي الموحد أي Df

اختياري ، ويمكنك أيضًا إعادة تقسيمه في BigRDD واحد

val files = sc.textFile(filename, 1).repartition(1)

إعادة تقسيم العمل يعمل دائما: د


يمكنك تحديد أدلة كاملة ، واستخدام أحرف البدل وحتى CSV من الدلائل وحروف البدل. على سبيل المثال:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

كما يشير نيك شماس إلى هذا هو التعرض ل FileInputFormat Hadoop ، وبالتالي هذا يعمل أيضا مع Hadoop (و Scalding).





apache-spark