scala - spark left join



用Spark數據集在Scala中進行類型化的連接 (1)

意見

如果連接條件基於等於運算符,則Spark SQL可以優化連接。 這意味著我們可以分別考慮外源性和非外源性。

等值連接

通過將兩個Datasets映射到(鍵,值)元組,根據鍵執行連接並重新調整結果,Equijoin可以以類型安全的方式實現:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

非等值連接

可以使用關係代數算子表示為R⋈θS =σθ(R×S)並直接轉換為代碼。

Spark 2.0

啟用crossJoin並使用joinWith與謂詞相等的謂詞:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

Spark 2.1

使用crossJoin方法:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

例子

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

筆記

  • 應該注意的是,這些方法與直接joinWith應用程序有本質區別,並且需要昂貴的DeserializeToObject / SerializeFromObject轉換(與直接joinWith可以對數據使用邏輯操作相比)。

    這與Spark 2.0 Dataset vs DataFrame中描述的行為類似。

  • 如果您不限於Spark SQL API, framelessDatasets提供有趣的類型安全擴展(截至今天,僅支持Spark 2.0):

    import frameless.TypedDataset
    
    val typedPoints1 = TypedDataset.create(points1)
    val typedPoints2 = TypedDataset.create(points2)
    
    typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
  • Dataset API在1.6中不穩定,因此我認為在這裡使用它是沒有意義的。

  • 當然這個設計和描述性的名字是沒有必要的。 你可以很容易地使用類型來隱式地將這個方法添加到Dataset集中,並且與內置的簽名沒有衝突,因此都可以被稱為joinWith

我喜歡Spark數據集,因為它們在編譯時給我分析錯誤和語法錯誤,還允許我使用getter而不是硬編碼的名稱/數字。 大部分計算都可以使用Dataset的高級API來完成。 例如,通過訪問數據集類型對象,比使用RDD行的數據字段執行agg,select,sum,avg,map,filter或groupBy操作要簡單得多。

然而,這個連接操作是缺少的,我讀了,我可以做這樣的連接

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

但是這不是我想要的,因為我更喜歡通過case類接口來實現,所以更像這樣

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

現在最好的選擇似乎是在case類旁邊創建一個對象,並給這個函數提供正確的列名作為String。 所以我會使用第一行代碼,而不是一個硬編碼的列名稱函數。 但是,這並不夠高雅。

有人能告訴我在這裡的其他選項嗎? 我們的目標是從實際的列名抽像出來,最好通過case類的getters來工作。

我正在使用Spark 1.6.1和Scala 2.10





apache-spark-sql