python - GroupBy Spalten- und Filterzeilen mit Maximalwert in Pyspark




apache-spark apache-spark-sql (2)

Ich bin mir fast sicher, dass dies schon einmal gefragt wurde, aber eine Suche durch Stackoverflow hat meine Frage nicht beantwortet. Kein Duplikat von [2] da ich den Maximalwert und nicht das häufigste Element haben möchte. Ich bin neu in Pyspark und versuche, etwas wirklich Einfaches zu tun: Ich möchte durch Spalte "A" groupBy und dann nur die Zeile jeder Gruppe behalten, die den Maximalwert in Spalte "B" hat. So was:

df_cleaned = df.groupBy("A").agg(F.max("B"))

Leider wirft dies alle anderen Spalten weg - df_cleaned enthält nur die Spalten "A" und den Maximalwert von B. Wie behalte ich stattdessen die Zeilen? ("A", "B", "C" ...)


Ein anderer möglicher Ansatz besteht darin, den Datenrahmen mit der Angabe von "leftsemi" zu verbinden. Diese Art der Verknüpfung umfasst alle Spalten des Datenrahmens auf der linken Seite und keine Spalten auf der rechten Seite.

Zum Beispiel:

import pyspark.sql.functions as f
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+

Der Maximalwert von Spalte B nach Spalte A kann folgendermaßen ausgewählt werden:

df.groupBy('A').agg(f.max('B')
+---+---+
|  A|  B|
+---+---+
|  a|  8|
|  b|  3|
+---+---+

Wenn Sie diesen Ausdruck als rechte Seite in einem linken Semi-Join verwenden und die erhaltene Spalte max(B) wieder in den ursprünglichen Namen B umbenennen, erhalten Sie das gewünschte Ergebnis:

df.join(df.groupBy('A').agg(f.max('B').alias('B')),on='B',how='leftsemi').show()
+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+

Der physikalische Plan hinter dieser Lösung und der von der akzeptierten Antwort unterscheiden sich, und es ist mir immer noch nicht klar, welcher Plan bei großen Datenrahmen besser abschneidet.

Das gleiche Ergebnis kann mit der Spark-SQL-Syntax erzielt werden:

df.registerTempTable('table')
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''
sqlContext.sql(q).show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+

Sie können dies ohne ein udf indem Sie ein Window .

Betrachten Sie das folgende Beispiel:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
    ('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  5|
#|  a|  8|
#|  a|  7|
#|  b|  1|
#|  b|  3|
#+---+---+

Erstellen Sie ein Window für die Partitionierung nach Spalte A und verwenden Sie dieses Window , um das Maximum jeder Gruppe zu berechnen. Filtern Sie dann die Zeilen so heraus, dass der Wert in Spalte B gleich dem max ist.

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  8|
#|  b|  3|
#+---+---+

Oder gleichwertig mit pyspark-sql :

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#|  A|  B|
#+---+---+
#|  b|  3|
#|  a|  8|
#+---+---+




apache-spark-sql