apache spark - أكثر من ساعة واحدة لتنفيذ pyspark.sql.DataFrame.take(4)



apache-spark apache-spark-sql (1)

أقوم بتشغيل الشرارة 1.6 على 3 VMs (أي 1X رئيسية ؛ 2X عبيد) مع كل 4 النوى وذاكرة الوصول العشوائي 16GB.

أستطيع أن أرى العمال المسجلين على شرارة سيد webUI.

أريد استرداد البيانات من قاعدة بيانات Vertica الخاصة بي للعمل عليها. نظرًا لأنني لم أتمكن من تشغيل استعلامات معقدة ، فقد حاولت فهم استفسارات وهمية. نحن نعتبر هنا مهمة سهلة.

الكود هو:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)

والإخراج هو (ملاحظة: أنا @IPSLAVE بـ @IPSLAVE العبد VM IP: Port):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

كما ترون يستغرق وقتا طويلا reaaaaaally. إن طاولتي كبيرة جدًا (تحتوي على حوالي 220 مليون سطر ، 11 حقلًا لكل منها) ولكن سيتم تنفيذ هذا الاستعلام على الفور باستخدام sql "عادي" (على سبيل المثال pyodbc).

أظن أنني أخطأت في فهم / أخطأت في استخدام سبارك ، هل سيكون لديك أفكار أو نصيحة لجعلها تعمل بشكل أفضل؟


في حين يدعم Spark دفعًا مؤقتًا محدودًا إلى JDBC لجميع العمليات الأخرى ، مثل الحد ، المجموعة ، يتم تنفيذ عمليات التجميع داخليًا. لسوء الحظ ، هذا يعني أن take(4) سيحضر البيانات أولاً ثم يطبق limit . بمعنى آخر ، ستنفذ قاعدة بياناتك (مع افتراض عدم وجود مرشحات) أي شيء يعادل:

SELECT * FROM table 

والباقي سوف يعالجها سبارك. هناك بعض التحسينات المعنية (على وجه الخصوص تقوم Spark بتقييم الأقسام بشكل تكراري للحصول على عدد من السجلات المطلوبة بواسطة LIMIT ) لكنها لا تزال غير فعالة تمامًا مقارنة بالتحسينات من جانب قاعدة البيانات.

إذا كنت تريد دفع limit لقاعدة البيانات ، dbtable عليك القيام بذلك بشكل ثابت باستخدام الاستعلام الفرعي كمعلمة dbtable :

(sqlContext.read.format('jdbc')
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
  "url"     -> "xxxx",
  "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

يرجى ملاحظة أن الاسم المستعار في الاستعلام الفرعي إلزامي.

ملاحظة :

قد يتم تحسين هذا السلوك في المستقبل ، بمجرد إعداد Data Source API v2:





pyspark-sql