scala - Wie vermeide ich doppelte Spalten nach dem Join?




apache-spark apache-spark-sql (5)

Die einfache Antwort (aus der Databricks-FAQ zu diesem Thema ) besteht darin, den Join auszuführen, bei dem die verknüpften Spalten als Array von Zeichenfolgen (oder als eine Zeichenfolge) anstelle eines Prädikats ausgedrückt werden.

Unten finden Sie ein Beispiel, das aus den häufig gestellten Fragen (FAQ) von Databricks angepasst wurde, jedoch zwei Verknüpfungsspalten enthält, um die Frage des ursprünglichen Posters zu beantworten.

Hier ist der linke Datenrahmen:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))

val left = llist.toDF("firstname","lastname","date","duration")

left.show()

/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

Hier ist der richtige Datenrahmen:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")

right.show()

/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

Hier ist eine falsche Lösung, bei der die Verknüpfungsspalten als das Prädikat left("firstname")===right("firstname") && left("lastname")===right("lastname") .

Das falsche Ergebnis ist, dass die Spalten lastname und lastname im lastname dupliziert werden:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show

/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

Die richtige Lösung besteht darin, die Verknüpfungsspalten als ein Array von Zeichenfolgen Seq("firstname", "lastname") . Der Ausgabedatenrahmen enthält keine doppelten Spalten:

left.join(right, Seq("firstname", "lastname")).show

/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/

Ich habe zwei Datenrahmen mit den folgenden Spalten:

df1.columns
//  Array(ts, id, X1, X2)

und

df2.columns
//  Array(ts, id, Y1, Y2)

Nachdem ich es getan habe

val df_combined = df1.join(df2, Seq(ts,id))

Am Ende habe ich folgende Spalten: Array(ts, id, X1, X2, ts, id, Y1, Y2) . Ich konnte damit rechnen, dass die gemeinsamen Spalten wegfallen würden. Gibt es etwas, das zusätzlich getan werden muss?


Dies ist ein erwartetes Verhalten. DataFrame.join Methode entspricht der folgenden SQL-Verknüpfung

SELECT * FROM a JOIN b ON joinExprs

Wenn Sie doppelte Spalten ignorieren möchten, legen Sie sie einfach ab oder wählen Sie anschließend die gewünschten Spalten aus. Wenn Sie eine DataFrames möchten, können Sie übergeordnete DataFrames verwenden, um auf diese DataFrames :

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???

a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

oder benutze Aliase:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

Für Equi-Joins gibt es eine spezielle Shortcut-Syntax, die entweder eine Folge von Strings akzeptiert :

val usingColumns: Seq[String] = ???

a.join(b, usingColumns)

oder als Einzelsaite

val usingColumn: String = ???

a.join(b, usingColumn)

die nur eine Kopie der Spalten behalten, die in einer Verknüpfungsbedingung verwendet werden.


Es wird empfohlen, den Spaltennamen vor dem Beitritt in beiden DFs zu ändern und entsprechend zu löschen.

df1.columns = [ID, Alter, Einkommen] df2.column = [ID, Altersgruppe]

df1.join (df2, on = df1.id == df2.id, how = 'inner'). write.saveAsTable ('table_name')

// gibt error während error für doppelte Spalten zurück

// versuche es stattdessen

df1.join (df2.withColumnRenamed ('id', 'id_2'), on = df1.id == df2.id_2, how = 'inner'). drop ('id_2')


Ich bin schon eine Weile damit festgefahren, und erst kürzlich habe ich eine Lösung gefunden, die recht einfach ist.

Sagen wir a ist

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]

scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and 
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]

scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

und ich kann dies tun, um nur den Wert in dataframe a auszuwählen:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+

Versuche dies,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))