apache spark - Spark 윈도우 함수에서 단일 파티션 모드의 성능 영향 방지



apache-spark pyspark (1)

내 질문은 스파크 데이터 프레임의 연속 행 간의 차이를 계산하는 유스 케이스에 의해 트리거됩니다.

예를 들어, 나는 가지고있다 :

>>> df.show()
+-----+----------+
|index|      col1|
+-----+----------+
|  0.0|0.58734024|
|  1.0|0.67304325|
|  2.0|0.85154736|
|  3.0| 0.5449719|
+-----+----------+

"Window"함수를 사용하여 이들을 계산하도록 선택하면 다음과 같이 할 수 있습니다.

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index|      col1| diffs_col1|
+-----+----------+-----------+
|  0.0|0.58734024|0.085703015|
|  1.0|0.67304325| 0.17850411|
|  2.0|0.85154736|-0.30657548|
|  3.0| 0.5449719|       null|
+-----+----------+-----------+

질문 : 데이터 프레임을 단일 파티션으로 명시 적으로 분할했습니다. 이 성능에 미치는 영향은 무엇이며, 존재한다면 그 이유는 무엇이며 어떻게 피할 수 있습니까? 파티션을 지정하지 않으면 다음 경고가 표시됩니다.

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

실제로 partitionBy 절을 생략하면 성능에 미치는 영향은 거의 동일합니다. 모든 레코드는 단일 파티션으로 셔플되어 로컬로 정렬되고 순차적으로 순차적으로 반복됩니다.

차이는 전체적으로 생성 된 파티션의 수에만 있습니다. 10 개의 파티션과 1000 개의 레코드가있는 간단한 데이터 세트를 사용한 예를 들어 설명해 보겠습니다.

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

partition by 절없이 프레임을 정의하면

w_unpart = Window.orderBy(f.col("index").asc())

lag 과 함께 사용하십시오.

df_lag_unpart = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)

총 하나의 파티션 만 존재합니다.

df_lag_unpart.rdd.glom().map(len).collect()
[1000]

더미 인덱스가있는 프레임 정의와 비교 (코드에 비해 약간 단순화 :

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

spark.sql.shuffle.partitions 동일한 개수의 파티션을 사용합니다.

spark.conf.set("spark.sql.shuffle.partitions", 11)

df_lag_part = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)

df_lag_part.rdd.glom().count()
11

비어 있지 않은 파티션이 하나만있는 경우

df_lag_part.rdd.glom().filter(lambda x: x).count()
1

불행히도 PySpark에서이 문제를 해결하는 데 사용할 수있는 보편적 인 해결책은 없습니다. 이것은 분산 처리 모델과 결합 된 구현의 고유 한 메커니즘입니다.

index 열은 순차적이므로 블록 당 고정 된 수의 레코드로 인공 파티셔닝 키를 생성 할 수 있습니다.

rec_per_block  = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))

df_with_block = df.withColumn(
    "block", (f.col("index") / rec_per_block).cast("int")
)

프레임 지정을 정의하는 데 사용합니다.

w_with_block = Window.partitionBy("block").orderBy("index")

df_lag_with_block = df_with_block.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)

예상되는 파티션 수를 사용합니다.

df_lag_with_block.rdd.glom().count()
11

대략 균일 한 데이터 분포 (해시 충돌을 피할 수 없음) :

df_lag_with_block.rdd.glom().map(len).collect()
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

그러나 블록 경계에 많은 차이가 있습니다.

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()
12

경계가 계산하기 쉽기 때문에 :

from itertools import chain

boundary_idxs = sorted(chain.from_iterable(
    # Here we depend on sequential identifiers
    # This could be generalized to any monotonically increasing
    # id by taking min and max per block
    (idx - 1, idx) for idx in 
    df_lag_with_block.groupBy("block").min("index")
        .drop("block").rdd.flatMap(lambda x: x)
        .collect()))[2:]  # The first boundary doesn't carry useful inf.

당신은 항상 선택할 수 있습니다 :

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

별도로 기입하십시오.

# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))

join :

combined = (df_lag_with_block
    .join(missing_with_lag, ["index"], "leftouter")
    .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

원하는 결과를 얻으려면 :

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
    combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0




window-functions