apache-spark - 예제 - sqoop spark streaming




Spark에서 단계는 어떻게 작업으로 분할됩니까? (2)

내가 올바르게 이해하면 혼란스럽게하는 2 가지가 있습니다.

1) 작업 내용을 결정하는 것은 무엇입니까?

2) 실행할 작업 수는 어떻게 결정됩니까?

Spark의 엔진은 연속적인 rdd에서 간단한 작업을 "접착"합니다.

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

따라서 rdd3이 (lazily) 계산되면 spark는 rdd1의 파티션 당 태스크를 생성하고 각 태스크는 필터와 라인 당 맵을 모두 실행하여 rdd3을 생성합니다.

작업 수는 파티션 수에 따라 결정됩니다. 모든 RDD에는 정의 된 수의 파티션이 있습니다. HDFS에서 읽은 소스 RDD (예 : sc.textFile (...) 사용)의 경우 파티션 수는 입력 형식으로 생성 된 분할 수입니다. RDD에 대한 일부 작업으로 인해 파티션 수가 다른 RDD가 발생할 수 있습니다.

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

또 다른 예는 조인입니다.

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

파티션 수를 변경하는 (대부분) 작업에는 셔플이 포함됩니다.

rdd2 = rdd1.repartition( 1000 ) 

실제로 rdd1의 각 파티션에 대한 작업은 다음 단계에서 읽을 수있는 최종 출력을 생성하여 rdd2가 정확히 1000 개의 파티션을 갖도록해야합니다 ( Hash 또는 Sort ). 이 측면의 작업을 "맵 (측면) 작업"이라고도합니다. 나중에 rdd2에서 실행되는 태스크는 하나의 파티션 (rdd2!의)에서 작동하며 해당 파티션과 관련된 맵 측 출력을 읽고 / 결합하는 방법을 알아 내야합니다. 이 측면의 작업을 "감소 (측면) 작업"이라고도합니다.

두 가지 질문이 관련되어 있습니다. 단계의 작업 수는 파티션 수 (연속 rdd가 "접착 됨"에 공통 임)이고 rdd의 파티션 수는 단계 수 (파티션 수를 일부로 지정하여) 사이에서 변경 될 수 있습니다. 셔플 (예 :)).

스테이지 실행이 시작되면 해당 작업이 작업 슬롯을 차지할 수 있습니다. 동시 작업 슬롯 수는 numExecutors * ExecutorCores입니다. 일반적으로, 서로 다른 비 종속적 단계의 작업이이를 점유 할 수 있습니다.

다음은 한 시점에 하나의 Spark 작업 만 실행되고 있다고 가정합니다.

내가 지금까지 얻는 것

다음은 Spark에서 발생하는 것을 이해하는 것입니다.

  1. SparkContext 가 작성되면 각 작업자 노드가 실행기를 시작합니다. 실행기는 별도의 프로세스 (JVM)이며 드라이버 프로그램에 다시 연결됩니다. 각 실행 프로그램에는 드라이버 프로그램의 jar이 있습니다. 드라이버를 종료하면 실행 프로그램이 종료됩니다. 각 실행자는 일부 파티션을 보유 할 수 있습니다.
  2. 작업이 실행될 때 계보 그래프에 따라 실행 계획이 작성됩니다.
  3. 실행 작업은 여러 개의 (계보 그래프에서) 변환 및 작업을 포함하지만 셔플은없는 단계로 분할됩니다. 따라서 스테이지는 셔플로 분리됩니다.

나는 이해

  • 작업은 Function 객체를 직렬화하여 드라이버에서 실행기로 전송되는 명령입니다.
  • 실행 프로그램은 드라이버 jar를 사용하여 명령 (작업)을 deserialize하고 파티션에서 실행합니다.

그러나

질문

스테이지를 해당 작업으로 어떻게 분할합니까?

구체적으로 :

  1. 작업이 변환 및 작업에 의해 결정됩니까 아니면 작업에 여러 변환 / 작업이있을 수 있습니까?
  2. 작업이 파티션에 의해 결정됩니까 (예 : 파티션 당 단계 당 하나의 작업).
  3. 작업이 노드에 의해 결정됩니까 (예 : 노드 당 단계 당 하나의 작업)?

내가 생각하는 것 (올바른 경우에도 부분적인 대답 만)

https://0x0fff.com/spark-architecture-shuffle 에서 셔플은 이미지와 함께 설명됩니다

나는 규칙이

각 단계는 노드 수에 관계없이 # number-of-partitions 작업으로 나뉩니다.

첫 번째 이미지의 경우 3 개의 맵 작업과 3 개의 축소 작업이 있다고합니다.

0x0fff의 이미지의 경우 8 개의 맵 작업과 3 개의 축소 작업이 있습니다 (오렌지 3 개와 짙은 녹색 파일 3 개만 있다고 가정).

어떤 경우에도 공개 질문

그 맞습니까? 그러나 그것이 정확하더라도, 여러 작업 (예 : 여러 맵)이 하나의 작업 내에 있는지 또는 작업 당 하나의 작업으로 구분되는지 여부에 따라 여전히 열려 있기 때문에 위의 질문에 모두 대답하지는 않았습니다.

다른 사람들의 말

Spark의 작업은 무엇입니까? Spark 작업자는 jar 파일을 어떻게 실행합니까? Apache Spark 스케줄러는 어떻게 파일을 작업으로 분할합니까? 비슷하지만 내 질문에 명확하게 대답했다고 생각하지 않았습니다.


여기 꽤 좋은 개요가 있습니다. 질문에 대답하기 위해

  • stage 의 데이터 파티션마다 별도의 task 을 시작해야합니다. 각 파티션은 별도의 물리적 위치 (예 : HDFS의 블록 또는 로컬 파일 시스템의 디렉토리 / 볼륨)에있을 것입니다.

Stage 제출은 DAG Scheduler 가 주도합니다. 이는 상호 의존적이지 않은 단계가 병렬로 실행되도록 클러스터에 제출 될 수 있음을 의미합니다. 이는 클러스터의 병렬화 기능을 최대화합니다. 따라서 데이터 흐름의 작업이 동시에 발생할 수 있으면 여러 단계가 시작될 것으로 예상됩니다.

다음 장난감 예제에서 실제로 다음 유형의 작업을 수행하는 것을 볼 수 있습니다.

  • 두 개의 데이터 소스를로드
  • 두 데이터 소스에서 개별적으로 일부 맵 작업을 수행
  • 그들과 합류
  • 결과에 대한 일부 맵 및 필터 작업 수행
  • 결과를 저장

그렇다면 몇 단계를 거쳐야할까요?

  • 두 개의 데이터 소스를 병렬로로드하기위한 각각 1 단계 = 2 단계
  • 다른 두 단계에 종속 되는 join 을 나타내는 세 번째 단계
  • 참고 : 결합 된 데이터에 대한 모든 후속 작업은 순차적으로 수행되어야하므로 동일한 단계에서 수행 될 수 있습니다. 추가 작업을 시작하면 이전 작업이 완료 될 때까지 작업을 시작할 수 없으므로 추가 단계를 시작하면 이점이 없습니다.

그 장난감 프로그램입니다

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

그리고 여기에 결과의 DAG가 있습니다

이제 얼마나 많은 작업이 있습니까? 작업 수는 다음과 같아야합니다

#Partitions in the stage ( Stage * #Partitions in the stage )







apache-spark