scala - Wie vermeide ich doppelte Spalten nach dem Join?
apache-spark apache-spark-sql (5)
Die einfache Antwort (aus der Databricks-FAQ zu diesem Thema ) besteht darin, den Join auszuführen, bei dem die verknüpften Spalten als Array von Zeichenfolgen (oder als eine Zeichenfolge) anstelle eines Prädikats ausgedrückt werden.
Unten finden Sie ein Beispiel, das aus den häufig gestellten Fragen (FAQ) von Databricks angepasst wurde, jedoch zwei Verknüpfungsspalten enthält, um die Frage des ursprünglichen Posters zu beantworten.
Hier ist der linke Datenrahmen:
val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))
val left = llist.toDF("firstname","lastname","date","duration")
left.show()
/*
+---------+--------+----------+--------+
|firstname|lastname| date|duration|
+---------+--------+----------+--------+
| bob| b|2015-01-13| 4|
| alice| a|2015-04-23| 10|
+---------+--------+----------+--------+
*/
Hier ist der richtige Datenrahmen:
val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")
right.show()
/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
| alice| a| 100|
| bob| b| 23|
+---------+--------+------+
*/
Hier ist eine
falsche
Lösung, bei der die Verknüpfungsspalten als das Prädikat
left("firstname")===right("firstname") && left("lastname")===right("lastname")
.
Das falsche Ergebnis ist, dass die Spalten
lastname
und
lastname
im
lastname
dupliziert werden:
left.join(right, left("firstname")===right("firstname") &&
left("lastname")===right("lastname")).show
/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname| date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
| bob| b|2015-01-13| 4| bob| b| 23|
| alice| a|2015-04-23| 10| alice| a| 100|
+---------+--------+----------+--------+---------+--------+------+
*/
Die
richtige
Lösung besteht darin, die Verknüpfungsspalten als ein Array von Zeichenfolgen
Seq("firstname", "lastname")
.
Der Ausgabedatenrahmen enthält keine doppelten Spalten:
left.join(right, Seq("firstname", "lastname")).show
/*
+---------+--------+----------+--------+------+
|firstname|lastname| date|duration|upload|
+---------+--------+----------+--------+------+
| bob| b|2015-01-13| 4| 23|
| alice| a|2015-04-23| 10| 100|
+---------+--------+----------+--------+------+
*/
Ich habe zwei Datenrahmen mit den folgenden Spalten:
df1.columns
// Array(ts, id, X1, X2)
und
df2.columns
// Array(ts, id, Y1, Y2)
Nachdem ich es getan habe
val df_combined = df1.join(df2, Seq(ts,id))
Am Ende habe ich folgende Spalten:
Array(ts, id, X1, X2, ts, id, Y1, Y2)
.
Ich konnte damit rechnen, dass die gemeinsamen Spalten wegfallen würden.
Gibt es etwas, das zusätzlich getan werden muss?
Dies ist ein erwartetes Verhalten.
DataFrame.join
Methode entspricht der folgenden SQL-Verknüpfung
SELECT * FROM a JOIN b ON joinExprs
Wenn Sie doppelte Spalten ignorieren möchten, legen Sie sie einfach ab oder wählen Sie anschließend die gewünschten Spalten aus.
Wenn Sie eine
DataFrames
möchten, können Sie übergeordnete
DataFrames
verwenden, um auf diese
DataFrames
:
val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???
a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))
oder benutze Aliase:
// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")
Für Equi-Joins gibt es eine spezielle Shortcut-Syntax, die entweder eine Folge von Strings akzeptiert :
val usingColumns: Seq[String] = ???
a.join(b, usingColumns)
oder als Einzelsaite
val usingColumn: String = ???
a.join(b, usingColumn)
die nur eine Kopie der Spalten behalten, die in einer Verknüpfungsbedingung verwendet werden.
Es wird empfohlen, den Spaltennamen vor dem Beitritt in beiden DFs zu ändern und entsprechend zu löschen.
df1.columns = [ID, Alter, Einkommen] df2.column = [ID, Altersgruppe]
df1.join (df2, on = df1.id == df2.id, how = 'inner'). write.saveAsTable ('table_name')
// gibt error während error für doppelte Spalten zurück
// versuche es stattdessen
df1.join (df2.withColumnRenamed ('id', 'id_2'), on = df1.id == df2.id_2, how = 'inner'). drop ('id_2')
Ich bin schon eine Weile damit festgefahren, und erst kürzlich habe ich eine Lösung gefunden, die recht einfach ist.
Sagen wir a ist
scala> val a = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]
scala> a.show
+---+----+
|key|vala|
+---+----+
| a| 1|
| b| 2|
+---+----+
and
scala> val b = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]
scala> b.show
+---+----+
|key|valb|
+---+----+
| a| 1|
+---+----+
und ich kann dies tun, um nur den Wert in dataframe a auszuwählen:
scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
| a| 1|
| b| 2|
+---+----+
Versuche dies,
val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))