python - GroupBy العمود وتصفية الصفوف مع أقصى قيمة في Pyspark




apache-spark apache-spark-sql (2)

أنا متأكد تقريبًا من أن هذا قد تم طرحه من قبل ، لكن البحث عبر stackoverflow لم يجيب على سؤالي. ليست نسخة مكررة من [2] لأنني أريد الحد الأقصى للقيمة ، وليس العنصر الأكثر شيوعًا. أنا جديد على pyspark وأحاول أن أفعل شيئًا بسيطًا جدًا: أريد أن أجعل العمود تجميع "أ" ثم أحتفظ فقط بصف كل مجموعة لها أقصى قيمة في العمود "B". مثله:

df_cleaned = df.groupBy("A").agg(F.max("B"))

لسوء الحظ ، يؤدي هذا إلى التخلص من جميع الأعمدة الأخرى - تحتوي df_cleaned فقط على الأعمدة "A" والقيمة القصوى لـ B. كيف يمكنني الاحتفاظ بالصفوف بدلاً من ذلك؟ ("أ" ، "ب" ، "ج" ...)


نهج آخر ممكن هو تطبيق الانضمام إلى dataframe مع نفسها تحديد "leftsemi". يتضمن هذا النوع من الربط جميع الأعمدة من ملف البيانات على الجانب الأيسر ولا توجد أعمدة في الجانب الأيمن.

فمثلا:

import pyspark.sql.functions as f
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+

يمكن تحديد الحد الأقصى لقيمة العمود B حسب العمود A من خلال:

df.groupBy('A').agg(f.max('B')
+---+---+
|  A|  B|
+---+---+
|  a|  8|
|  b|  3|
+---+---+

باستخدام هذا التعبير كجانب الأيمن في رابط شبه الأيسر ، وإعادة تسمية العمود الذي تم الحصول عليه max(B) إلى اسمه الأصلي B ، يمكننا الحصول على النتيجة المطلوبة:

df.join(df.groupBy('A').agg(f.max('B').alias('B')),on='B',how='leftsemi').show()
+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+

تختلف الخطة المادية الكامنة وراء هذا الحل والأخرى من الإجابة المقبولة ، ولا يزال من غير الواضح بالنسبة لي أيهما سيكون أفضل أداءً على قواعد البيانات الكبيرة.

يمكن الحصول على نفس النتيجة باستخدام شرارة SQL بناء الجملة:

df.registerTempTable('table')
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''
sqlContext.sql(q).show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+

يمكنك القيام بذلك دون udf باستخدام Window .

النظر في المثال التالي:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
    ('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  5|
#|  a|  8|
#|  a|  7|
#|  b|  1|
#|  b|  3|
#+---+---+

قم بإنشاء Window للتقسيم حسب العمود A واستخدم هذا لحساب الحد الأقصى لكل مجموعة. ثم قم بتصفية الصفوف بحيث تساوي القيمة في العمود B الحد الأقصى.

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  8|
#|  b|  3|
#+---+---+

أو pyspark-sql باستخدام pyspark-sql :

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#|  A|  B|
#+---+---+
#|  b|  3|
#|  a|  8|
#+---+---+




apache-spark-sql