scala - write - 스파크 파티션
HashPartitioner는 어떻게 작동합니까? (2)
데이터 셋을 조금 더 흥미롭게 만들자.
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
우리는 6 가지 요소를 가지고 있습니다 :
rdd.count
Long = 6
파티 셔너 없음 :
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
그리고 8 개의 파티션 :
rdd.partitions.length
Int = 8
이제 작은 도우미를 정의하여 파티션 당 요소 수를 계산할 수 있습니다.
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
파티 셔 너가 없기 때문에 데이터 세트가 파티션간에 균일하게 분배됩니다 ( Spark의 기본 파티션 구성표 ).
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
이제 데이터 셋을 다시 파티션하자 :
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
HashPartitioner
전달 된 매개 변수는 파티션 수를 정의하므로 하나의 파티션이 필요합니다.
rddOneP.partitions.length
Int = 1
파티션이 하나뿐이므로 모든 요소가 포함됩니다.
countByPartition(rddOneP).collect
Array[Int] = Array(6)
셔플 후 값의 순서는 비 결정적입니다.
HashPartitioner(2)
사용하는 경우와 동일한 방법
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
우리는 2 개의 파티션을 얻을 것이다 :
rddTwoP.partitions.length
Int = 2
rdd
는 키 데이터로 분할되므로 더 이상 균일하게 배포되지 않습니다.
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
세 개의 키와 두 개의 다른
hashCode
mod
numPartitions
여기에는 예상치 못한 것이 없습니다.
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
위의 내용을 확인하십시오.
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
마지막으로
HashPartitioner(7)
를 사용하여 7 개의 파티션을 얻습니다. 각각 3 개의 비어 있지 않은 2 개의 요소가 있습니다.
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
요약 및 메모
-
HashPartitioner
는 파티션 수를 정의하는 단일 인수를 사용합니다. -
값은
hash
키를 사용하여 파티션에 할당됩니다.hash
함수는 언어에 따라 다를 수 있습니다 (Scala RDD는hashCode
사용하고DataSets
은 MurmurHash 3, PySpark,portable_hash
).키가 작은 정수인 경우와 같이 간단한 경우,
hash
는 ID (i = hash(i)
)라고 가정 할 수 있습니다.스칼라 API는
nonNegativeMod
를 사용하여 계산 된 해시를 기반으로 파티션을 결정합니다. -
키 분배가 균일하지 않은 경우 클러스터의 일부가 유휴 상태 인 상황에서 종료 될 수 있습니다.
-
키는 해시 가능해야합니다. PySpark의 reduceByKey 가 PySpark 특정 문제에 대해 읽을 수 있도록 A 목록에 대한 답변을 확인할 수 있습니다. 또 다른 가능한 문제는 HashPartitioner 설명서에서 강조됩니다.
Java 배열에는 해당 내용이 아닌 배열의 ID를 기반으로하는 hashCode가 있으므로 HashPartitioner를 사용하여 RDD [Array [ ]] 또는 RDD [(Array [ ], _)]를 분할하려고하면 예기치 않은 결과가 발생합니다.
-
파이썬 3에서는 해싱이 일관성이 있는지 확인해야합니다. pyspark에서 예외 : PYTHONHASHSEED를 통해 문자열 해시의 임의성을 비활성화해야합니까?를 참조하십시오.
-
해시 파티 셔 너는 인젝 티브도 아니고 인젝션도 아닙니다. 단일 파티션에 여러 키를 할당 할 수 있으며 일부 파티션은 비워 둘 수 있습니다.
-
REPL 정의 된 케이스 클래스 ( Apache Spark의 Case 클래스 동등성) 와 결합 된 경우 현재 해시 기반 메소드는 Scala에서 작동하지 않습니다.
-
HashPartitioner
(또는 다른Partitioner
)는 데이터를HashPartitioner
. 여러 작업간에 분할을 다시 사용하지 않으면 셔플 할 데이터 양이 줄어들지 않습니다.
HashPartitioner
문서를 읽었습니다.
불행히도 API 호출을 제외하고는 아무런 설명이 없습니다.
HashPartitioner
가 키의 해시를 기반으로 분산 세트를 분할한다고 가정합니다.
예를 들어 내 데이터가
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
따라서 파티 셔 너는 이것을 동일한 파티션에 동일한 키가있는 다른 파티션에 넣습니다. 그러나 생성자 인수의 중요성을 이해하지 못합니다.
new HashPartitoner(numPartitions) //What does numPartitions do?
위의 데이터 세트의 경우 내가 한 경우 결과가 어떻게 다릅니 까
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
그렇다면
HashPartitioner
는 실제로 어떻게 작동합니까?
HashPartitioner.getPartition
메소드는
키
를 인수로 사용하여 키가 속한 파티션의
색인
을 리턴합니다.
파티 셔 너는 유효한 인덱스가 무엇인지 알아야하므로 올바른 범위의 숫자를 반환합니다.
파티션 수는
numPartitions
생성자 인수를 통해 지정됩니다.
구현은 대략
key.hashCode() % numPartitions
반환합니다.
자세한 내용은
Partitioner.scala
를 참조하십시오.