scala - write - 스파크 파티션




HashPartitioner는 어떻게 작동합니까? (2)

데이터 셋을 조금 더 흥미롭게 만들자.

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

우리는 6 가지 요소를 가지고 있습니다 :

rdd.count
Long = 6

파티 셔너 없음 :

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

그리고 8 개의 파티션 :

rdd.partitions.length
Int = 8

이제 작은 도우미를 정의하여 파티션 당 요소 수를 계산할 수 있습니다.

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

파티 셔 너가 없기 때문에 데이터 세트가 파티션간에 균일하게 분배됩니다 ( Spark의 기본 파티션 구성표 ).

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

이제 데이터 셋을 다시 파티션하자 :

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

HashPartitioner 전달 된 매개 변수는 파티션 수를 정의하므로 하나의 파티션이 필요합니다.

rddOneP.partitions.length
Int = 1

파티션이 하나뿐이므로 모든 요소가 포함됩니다.

countByPartition(rddOneP).collect
Array[Int] = Array(6)

셔플 후 값의 순서는 비 결정적입니다.

HashPartitioner(2) 사용하는 경우와 동일한 방법

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

우리는 2 개의 파티션을 얻을 것이다 :

rddTwoP.partitions.length
Int = 2

rdd 는 키 데이터로 분할되므로 더 이상 균일하게 배포되지 않습니다.

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

세 개의 키와 두 개의 다른 hashCode mod numPartitions 여기에는 예상치 못한 것이 없습니다.

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

위의 내용을 확인하십시오.

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

마지막으로 HashPartitioner(7) 를 사용하여 7 개의 파티션을 얻습니다. 각각 3 개의 비어 있지 않은 2 개의 요소가 있습니다.

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

요약 및 메모

  • HashPartitioner 는 파티션 수를 정의하는 단일 인수를 사용합니다.
  • 값은 hash 키를 사용하여 파티션에 할당됩니다. hash 함수는 언어에 따라 다를 수 있습니다 (Scala RDD는 hashCode 사용하고 DataSets 은 MurmurHash 3, PySpark, portable_hash ).

    키가 작은 정수인 경우와 같이 간단한 경우, hash 는 ID ( i = hash(i) )라고 가정 할 수 있습니다.

    스칼라 API는 nonNegativeMod 를 사용하여 계산 된 해시를 기반으로 파티션을 결정합니다.

  • 키 분배가 균일하지 않은 경우 클러스터의 일부가 유휴 상태 인 상황에서 종료 될 수 있습니다.

  • 키는 해시 가능해야합니다. PySpark의 reduceByKey 가 PySpark 특정 문제에 대해 읽을 수 있도록 A 목록에 대한 답변을 확인할 수 있습니다. 또 다른 가능한 문제는 HashPartitioner 설명서에서 강조됩니다.

    Java 배열에는 해당 내용이 아닌 배열의 ID를 기반으로하는 hashCode가 있으므로 HashPartitioner를 사용하여 RDD [Array [ ]] 또는 RDD [(Array [ ], _)]를 분할하려고하면 예기치 않은 결과가 발생합니다.

  • 파이썬 3에서는 해싱이 일관성이 있는지 확인해야합니다. pyspark에서 예외 : PYTHONHASHSEED를 통해 문자열 해시의 임의성을 비활성화해야합니까?를 참조하십시오.

  • 해시 파티 셔 너는 인젝 티브도 아니고 인젝션도 아닙니다. 단일 파티션에 여러 키를 할당 할 수 있으며 일부 파티션은 비워 둘 수 있습니다.

  • REPL 정의 된 케이스 클래스 ( Apache Spark의 Case 클래스 동등성) 와 결합 된 경우 현재 해시 기반 메소드는 Scala에서 작동하지 않습니다.

  • HashPartitioner (또는 다른 Partitioner )는 데이터를 HashPartitioner . 여러 작업간에 분할을 다시 사용하지 않으면 셔플 할 데이터 양이 줄어들지 않습니다.

HashPartitioner 문서를 읽었습니다. 불행히도 API 호출을 제외하고는 아무런 설명이 없습니다. HashPartitioner 가 키의 해시를 기반으로 분산 세트를 분할한다고 가정합니다. 예를 들어 내 데이터가

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

따라서 파티 셔 너는 이것을 동일한 파티션에 동일한 키가있는 다른 파티션에 넣습니다. 그러나 생성자 인수의 중요성을 이해하지 못합니다.

new HashPartitoner(numPartitions) //What does numPartitions do?

위의 데이터 세트의 경우 내가 한 경우 결과가 어떻게 다릅니 까

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

그렇다면 HashPartitioner 는 실제로 어떻게 작동합니까?


HashPartitioner.getPartition 메소드는 를 인수로 사용하여 키가 속한 파티션의 색인 을 리턴합니다. 파티 셔 너는 유효한 인덱스가 무엇인지 알아야하므로 올바른 범위의 숫자를 반환합니다. 파티션 수는 numPartitions 생성자 인수를 통해 지정됩니다.

구현은 대략 key.hashCode() % numPartitions 반환합니다. 자세한 내용은 Partitioner.scala 를 참조하십시오.





partitioning