apache spark - كيفية جعل أمثلة استنساخ اباتشي سبارك جيدة




apache-spark dataframe (3)

لقد قضيت وقتًا لا بأس به في قراءة بعض الأسئلة من خلال pyspark spark-dataframe وكثيراً ما أجد أن الملصقات لا توفر معلومات كافية لفهم أسئلتهم حقًا. عادةً ما أطلب منهم أن MCVE لكن في بعض الأحيان ، MCVE على إظهار بعض بيانات المدخلات والمخرجات يشبه سحب الأسنان. على سبيل المثال: انظر التعليقات على هذا السؤال .

ربما يكون جزء من المشكلة هو أن الأشخاص لا يعرفون كيفية إنشاء MCVE بسهولة لبيانات الشرارة. أعتقد أنه سيكون من المفيد الحصول على نسخة شرارة من dataframe لسؤال الباندا كدليل يمكن ربطه.

إذن كيف يمكن للمرء أن يخلق مثالاً جيدًا وقابل للتكرار؟


ضبط الأداء

إذا كان السؤال متعلقًا بضبط الأداء ، فيرجى تضمين المعلومات التالية.

خطة التنفيذ

من الأفضل تضمين خطة التنفيذ الموسعة . في بيثون:

df.explain(True) 

في سكالا:

df.explain(true)

أو خطة التنفيذ الموسعة مع الإحصاءات . في بيثون:

print(df._jdf.queryExecution().stringWithStats())

في سكالا:

df.queryExecution.stringWithStats

وضع ومعلومات الكتلة

  • mode - local ، client ، `الكتلة.
  • مدير المجموعة (إن وجد) - لا شيء (الوضع المحلي) ، مستقل ، YARN ، Mesos ، Kubernetes.
  • معلومات التكوين الأساسية (عدد النوى ، ذاكرة المنفذ).

توقيت المعلومات

البطيء نسبي ، خاصةً عند قيامك بمنفذ التطبيق غير الموزع أو إذا كنت تتوقع انخفاض زمن الوصول يمكن استرجاع التوقيت الدقيق للمهام والمراحل المختلفة من jobs Spark UI ( sc.uiWebUrl ) أو Spark REST UI.

استخدام أسماء موحدة للسياقات

يتيح لنا استخدام الأسماء المحددة لكل سياق إعادة إنتاج المشكلة بسرعة.

  • sc - ل SparkContext .
  • sqlContext - لـ SQLContext .
  • spark - ل SparkSession .

تقديم معلومات النوع ( سكالا )

يعد الاستنتاج القوي للكتابة واحدًا من أكثر ميزات Scala المفيدة ، ولكنه يصعب تحليل الشفرة المأخوذة من السياق. حتى لو كانت الكتابة واضحة من السياق ، فمن الأفضل أن تشرح المتغيرات. تفضل

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

على

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

الأدوات الشائعة الاستخدام يمكن أن تساعدك:

  • spark-shell / قذيفة سكالا

    استخدام :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
  • فكرة InteliJ

    استخدم Alt + =


تقديم بيانات عينة صغيرة ، والتي يمكن إعادة صياغتها بسهولة.

على الأقل ، يجب أن توفر الملصقات بضعة صفوف وأعمدة على قاعدة بياناتها ورمزها الذي يمكن استخدامه لإنشاءه بسهولة. من السهل ، يعني قص ولصق. اجعله صغيرًا قدر الإمكان لإظهار مشكلتك.

لدي dataframe التالية:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

والتي يمكن إنشاؤها باستخدام هذا الرمز:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

عرض الإخراج المطلوب.

اطرح سؤالك المحدد واوضح لنا المخرجات المطلوبة.

كيف يمكنني إنشاء عمود جديد 'is_divisible' له القيمة 'yes' إذا كان يوم شهر 'date' زائد 7 أيام قابل للقسمة على القيمة في العمود 'X' و 'no' خلاف ذلك؟

النتيجة المرجوة:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

اشرح كيفية الحصول على الإخراج الخاص بك.

اشرح بتفصيل كبير كيف تحصل على المخرجات المطلوبة. يساعد على إظهار مثال حساب.

على سبيل المثال في الصف 1 ، X = 1 والتاريخ = 2017-01-01. مضيفا 7 أيام حتى الآن غلة 2017-01-08. يوم الشهر هو 8 وبما أن 8 قابلة للقسمة على 1 ، فإن الجواب هو "نعم".

وبالمثل ، بالنسبة للصف الأخير X = 7 والتاريخ = 2017-01-04. إضافة 7 إلى تاريخ العائد 11 كـ يوم الشهر. بما أن 11٪ 7 ليست 0 ، فإن الإجابة هي "لا".

مشاركة الرمز الحالي الخاص بك.

أطلعنا على ما قمت به أو جربته ، بما في ذلك كل الكود * حتى لو كان لا يعمل. أخبرنا بالمكان الذي تتعثر فيه ، وإذا تلقيت خطأ ، فيرجى تضمين رسالة الخطأ.

(* يمكنك استبعاد الكود لإنشاء سياق الشرارة ، لكن يجب عليك تضمين جميع الواردات.)

أعرف كيفية إضافة عمود جديد هو date زائد 7 أيام ، لكنني أواجه مشكلة في الحصول على يوم من الشهر كعدد صحيح.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

قم بتضمين الإصدارات والواردات واستخدام تمييز بناء الجملة

  • التفاصيل الكاملة في هذه الإجابة كتبها desertnaut .

لوظائف ضبط الأداء ، قم بتضمين خطة التنفيذ

  • التفاصيل الكاملة في هذه الإجابة كتبها user8371915 .
  • يساعد على استخدام أسماء موحدة للسياقات.

تحليل ملفات الإخراج شرارة

  • قدمت MaxU مفيدًا في هذه الإجابة للمساعدة في تحليل ملفات Spark الإخراج إلى DataFrame.

الملاحظات الأخرى.

  • تأكد من قراءة كيفية طرح السؤال MCVE أولاً.
  • قراءة الإجابات الأخرى لهذا السؤال ، والتي ترتبط أعلاه.
  • الحصول على لقب وصفي جيد.
  • كن مهذبا. الأشخاص في SO متطوعون ، لذا اسأل جيدًا.

قد تساعد وظيفة المساعد الصغيرة هذه في تحليل ملفات الإخراج Spark إلى DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

سكالا:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

الاستعمال:

df = read_spark_output("file:///tmp/spark.out")

ملاحظة: بالنسبة لـ pyspark ، يتوفر eqNullSafe من spark 2.3 .





pyspark-sql