scala - run - spark sql이란




DataFrame의 파티셔닝을 정의하는 방법은 무엇입니까? (4)

Spark 1.4.0에서 Spark SQL 및 DataFrames를 사용하기 시작했습니다. 스칼라의 DataFrames에서 사용자 정의 파티 셔 너를 정의하고 싶지만 어떻게해야합니까?

내가 작업하고있는 데이터 테이블 중 하나에는 계정별로 다음 예제의 silimar 트랜잭션 목록이 포함되어 있습니다.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

적어도 초기에는 대부분의 계산이 계정 내의 거래간에 발생합니다. 따라서 계정에 대한 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 분할하고 싶습니다.

그러나 이것을 정의하는 방법을 보지 못했습니다. DataFrame 클래스에는 만들 파티션 수를 지정할 수있는 'repartition (Int)'이라는 메서드가 있습니다. 그러나 RDD에 지정할 수있는 것과 같이 DataFrame에 대한 사용자 정의 파티 셔 너를 정의하는 데 사용할 수있는 방법이 없습니다.

소스 데이터는 Parquet에 저장됩니다. Parquet에 DataFrame을 쓸 때 분할 할 열을 지정할 수 있으므로 Parquet에게 'Account'열로 데이터를 분할하도록 지시 할 수 있습니다. 그러나 수백만 개의 계정이있을 수 있으며 Parquet를 올바르게 이해하면 각 계정에 대해 별도의 디렉토리를 만들므로 합리적인 솔루션처럼 들리지 않습니다.

계정에 대한 모든 데이터가 동일한 파티션에 있도록 Spark가이 DataFrame을 분할하도록하는 방법이 있습니까?


스파크> = 2.3.0

SPARK-22614 는 범위 분할을 노출합니다.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 Data Source API v2 에서 외부 형식 파티셔닝을 제공합니다.

스파크> = 1.6.0

Spark> = 1.6에서는 쿼리 및 캐싱에 열을 기준으로 파티셔닝을 사용할 수 있습니다. repartition 방법을 사용하는 SPARK-11410 SPARK-4849 를 참조하십시오.

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDs 와 달리 Spark Dataset ( Dataset[Row] 일명 DataFrame )은 현재로서는 사용자 정의 파티 DataFrame 사용할 수 없습니다. 일반적으로 인공 파티셔닝 컬럼을 작성하여이를 해결할 수 있지만 동일한 유연성을 제공하지는 않습니다.

스파크 <1.6.0 :

DataFrame 을 만들기 전에 입력 데이터를 사전 파티션하는 것이 DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

RDD 에서 DataFrame 만들려면 간단한 맵 단계 만 필요하므로 기존 파티션 레이아웃을 유지해야합니다 * :

assert(df.rdd.partitions == partitioned.partitions)

기존 DataFrame 다시 분할 할 수있는 동일한 방법 :

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

따라서 불가능하지 않은 것처럼 보입니다. 이치에 맞다면 문제는 남아 있습니다. 나는 대부분 그렇지 않다고 주장 할 것이다.

  1. 재 파티셔닝은 비용이 많이 드는 프로세스입니다. 일반적인 시나리오에서 대부분의 데이터는 직렬화, 셔플 및 직렬화 해제되어야합니다. 반면에 사전 분할 된 데이터의 이점을 얻을 수있는 작업 수는 비교적 적으며 내부 API가이 속성을 활용하도록 설계되지 않은 경우 더 제한됩니다.

    • 일부 시나리오에서는 참여하지만 내부 지원이 필요합니다.
    • 윈도우 함수는 일치하는 파티 셔너로 호출합니다. 위와 동일하며 단일 창 정의로 제한됩니다. 이미 내부적으로 분할되어 있으므로 사전 분할이 중복 될 수 있습니다.
    • GROUP BY 사용한 간단한 집계-임시 버퍼 **의 메모리 공간을 줄일 수 있지만 전체 비용은 훨씬 높습니다. groupByKey.mapValues(_.reduce) (현재 동작) vs reduceByKey (사전 분할)와 reduceByKey 합니다. 실제로 유용하지는 않습니다.
    • SqlContext.cacheTable 사용한 데이터 압축. 실행 길이 인코딩을 사용하는 것처럼 보이기 때문에 OrderedRDDFunctions.repartitionAndSortWithinPartitions 적용하면 압축률이 향상 될 수 있습니다.
  2. 키 배포에 따라 성능이 크게 좌우됩니다. 비뚤어지면 최적의 리소스 사용률을 얻습니다. 최악의 시나리오에서는 작업을 완료하는 것이 불가능합니다.

  3. 높은 수준의 선언적 API를 사용하는 요점은 낮은 수준의 구현 세부 정보에서 자신을 격리시키는 것입니다. @dwysakowicz 와 @RomiKuntsman 이미 언급했듯이 최적화는 Catalyst Optimizer 의 일입니다. 그것은 매우 정교한 짐승이며 내부에 더 깊이 들어 가지 않고 쉽게 향상시킬 수 있다고 생각합니다.

관련 개념

JDBC 소스로 파티션하기 :

JDBC 데이터 소스는 predicates 인수를 지원합니다. 다음과 같이 사용할 수 있습니다.

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

술어마다 단일 JDBC 파티션을 작성합니다. 개별 술어를 사용하여 작성된 세트가 분리되지 않은 경우 결과 테이블에 중복이 표시됩니다.

DataFrameWriter partitionBy 메소드 :

Spark DataFrameWriter 는 쓰기시 데이터를 "파티션"하는 데 사용할 수있는 partitionBy 메소드를 제공 partitionBy . 제공된 열 세트를 사용하여 쓰기시 데이터를 분리합니다.

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

이를 통해 키를 기반으로하는 쿼리에 대한 술어 푸시 다운을 읽을 수 있습니다.

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

그러나 DataFrame.repartition 과 동일하지 않습니다. 특히 다음과 같은 집계 :

val cnts = df1.groupBy($"k").sum()

여전히 TungstenExchange 가 필요합니다 :

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy 메소드 (Spark> = 2.0) :

bucketBypartitionBy 와 유사한 응용 프로그램을 가지고 있지만 테이블 ( saveAsTable )에만 사용할 수 있습니다. 버킷 팅 정보를 사용하여 조인을 최적화 할 수 있습니다.

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* 파티션 레이아웃 이란 데이터 배포만을 의미합니다. partitioned RDD에는 더 이상 partitioned 셔 너가 없습니다. ** 조기 투영이 없다고 가정합니다. 집계에서 열의 작은 하위 집합 만 다루는 경우에는 아무 것도 얻지 못할 수 있습니다.


RDD를 사용 하여이 작업을 수행 할 수있었습니다. 그러나 이것이 당신에게 적합한 솔루션인지는 모르겠습니다. DF를 RDD로 사용할 수있게되면 repartitionAndSortWithinPartitions 를 적용하여 데이터의 사용자 정의 재 파티셔닝을 수행 할 수 있습니다.

내가 사용한 샘플은 다음과 같습니다.

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)

그래서 어떤 종류의 대답으로 시작하려면 :)-당신은 할 수 없습니다

나는 전문가는 아니지만 DataFrames를 이해하는 한 rdd와 같지 않으며 DataFrame에는 Partitioner와 같은 것이 없습니다.

일반적으로 DataFrame의 아이디어는 이러한 문제 자체를 처리하는 다른 수준의 추상화를 제공하는 것입니다. DataFrame의 쿼리는 논리적 계획으로 변환되어 RDD 작업으로 추가 변환됩니다. 제안한 파티션은 자동으로 적용되거나 최소한 적용되어야합니다.

SparkSQL이 어떤 종류의 최적의 작업을 제공 할 것이라고 신뢰하지 않는 경우 의견에서 제안한대로 DataFrame을 RDD [Row]로 항상 변환 할 수 있습니다.


다음에서 반환 한 DataFrame을 사용하십시오.

yourDF.orderBy(account)

명시적인 방법은 데이터 프레임에서, PairRDD에서만 partitionBy 를 사용할 수는 없지만, DataFrame을 정렬 할 때는 LogicalPlan에서이를 사용하므로 각 계정에서 계산해야 할 때 도움이됩니다.

방금 계정별로 분할하려는 데이터 프레임과 동일한 정확한 문제가 발생했습니다. "계정의 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 분할하고 싶다"고 말하면 규모와 성능을 원하지만 코드는 의존하지 않습니다. mapPartitions() 등)?





partitioning