apache-spark - 예제 - sqoop spark streaming
Spark에서 단계는 어떻게 작업으로 분할됩니까? (2)
내가 올바르게 이해하면 혼란스럽게하는 2 가지가 있습니다.
1) 작업 내용을 결정하는 것은 무엇입니까?
2) 실행할 작업 수는 어떻게 결정됩니까?
Spark의 엔진은 연속적인 rdd에서 간단한 작업을 "접착"합니다.
rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count
따라서 rdd3이 (lazily) 계산되면 spark는 rdd1의 파티션 당 태스크를 생성하고 각 태스크는 필터와 라인 당 맵을 모두 실행하여 rdd3을 생성합니다.
작업 수는 파티션 수에 따라 결정됩니다. 모든 RDD에는 정의 된 수의 파티션이 있습니다. HDFS에서 읽은 소스 RDD (예 : sc.textFile (...) 사용)의 경우 파티션 수는 입력 형식으로 생성 된 분할 수입니다. RDD에 대한 일부 작업으로 인해 파티션 수가 다른 RDD가 발생할 수 있습니다.
rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).
또 다른 예는 조인입니다.
rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).
파티션 수를 변경하는 (대부분) 작업에는 셔플이 포함됩니다.
rdd2 = rdd1.repartition( 1000 )
실제로 rdd1의 각 파티션에 대한 작업은 다음 단계에서 읽을 수있는 최종 출력을 생성하여 rdd2가 정확히 1000 개의 파티션을 갖도록해야합니다 ( Hash 또는 Sort ). 이 측면의 작업을 "맵 (측면) 작업"이라고도합니다. 나중에 rdd2에서 실행되는 태스크는 하나의 파티션 (rdd2!의)에서 작동하며 해당 파티션과 관련된 맵 측 출력을 읽고 / 결합하는 방법을 알아 내야합니다. 이 측면의 작업을 "감소 (측면) 작업"이라고도합니다.
두 가지 질문이 관련되어 있습니다. 단계의 작업 수는 파티션 수 (연속 rdd가 "접착 됨"에 공통 임)이고 rdd의 파티션 수는 단계 수 (파티션 수를 일부로 지정하여) 사이에서 변경 될 수 있습니다. 셔플 (예 :)).
스테이지 실행이 시작되면 해당 작업이 작업 슬롯을 차지할 수 있습니다. 동시 작업 슬롯 수는 numExecutors * ExecutorCores입니다. 일반적으로, 서로 다른 비 종속적 단계의 작업이이를 점유 할 수 있습니다.
다음은 한 시점에 하나의 Spark 작업 만 실행되고 있다고 가정합니다.
내가 지금까지 얻는 것
다음은 Spark에서 발생하는 것을 이해하는 것입니다.
-
SparkContext
가 작성되면 각 작업자 노드가 실행기를 시작합니다. 실행기는 별도의 프로세스 (JVM)이며 드라이버 프로그램에 다시 연결됩니다. 각 실행 프로그램에는 드라이버 프로그램의 jar이 있습니다. 드라이버를 종료하면 실행 프로그램이 종료됩니다. 각 실행자는 일부 파티션을 보유 할 수 있습니다. - 작업이 실행될 때 계보 그래프에 따라 실행 계획이 작성됩니다.
- 실행 작업은 여러 개의 (계보 그래프에서) 변환 및 작업을 포함하지만 셔플은없는 단계로 분할됩니다. 따라서 스테이지는 셔플로 분리됩니다.
나는 이해
- 작업은 Function 객체를 직렬화하여 드라이버에서 실행기로 전송되는 명령입니다.
- 실행 프로그램은 드라이버 jar를 사용하여 명령 (작업)을 deserialize하고 파티션에서 실행합니다.
그러나
질문
스테이지를 해당 작업으로 어떻게 분할합니까?
구체적으로 :
- 작업이 변환 및 작업에 의해 결정됩니까 아니면 작업에 여러 변환 / 작업이있을 수 있습니까?
- 작업이 파티션에 의해 결정됩니까 (예 : 파티션 당 단계 당 하나의 작업).
- 작업이 노드에 의해 결정됩니까 (예 : 노드 당 단계 당 하나의 작업)?
내가 생각하는 것 (올바른 경우에도 부분적인 대답 만)
https://0x0fff.com/spark-architecture-shuffle 에서 셔플은 이미지와 함께 설명됩니다
나는 규칙이
각 단계는 노드 수에 관계없이 # number-of-partitions 작업으로 나뉩니다.
첫 번째 이미지의 경우 3 개의 맵 작업과 3 개의 축소 작업이 있다고합니다.
0x0fff의 이미지의 경우 8 개의 맵 작업과 3 개의 축소 작업이 있습니다 (오렌지 3 개와 짙은 녹색 파일 3 개만 있다고 가정).
어떤 경우에도 공개 질문
그 맞습니까? 그러나 그것이 정확하더라도, 여러 작업 (예 : 여러 맵)이 하나의 작업 내에 있는지 또는 작업 당 하나의 작업으로 구분되는지 여부에 따라 여전히 열려 있기 때문에 위의 질문에 모두 대답하지는 않았습니다.
다른 사람들의 말
Spark의 작업은 무엇입니까? Spark 작업자는 jar 파일을 어떻게 실행합니까? Apache Spark 스케줄러는 어떻게 파일을 작업으로 분할합니까? 비슷하지만 내 질문에 명확하게 대답했다고 생각하지 않았습니다.
여기 꽤 좋은 개요가 있습니다. 질문에 대답하기 위해
-
각
stage
의 데이터 파티션마다 별도의task
을 시작해야합니다. 각 파티션은 별도의 물리적 위치 (예 : HDFS의 블록 또는 로컬 파일 시스템의 디렉토리 / 볼륨)에있을 것입니다.
Stage
제출은
DAG Scheduler
가 주도합니다.
이는 상호 의존적이지 않은 단계가 병렬로 실행되도록 클러스터에 제출 될 수 있음을 의미합니다. 이는 클러스터의 병렬화 기능을 최대화합니다.
따라서 데이터 흐름의 작업이 동시에 발생할 수 있으면 여러 단계가 시작될 것으로 예상됩니다.
다음 장난감 예제에서 실제로 다음 유형의 작업을 수행하는 것을 볼 수 있습니다.
- 두 개의 데이터 소스를로드
- 두 데이터 소스에서 개별적으로 일부 맵 작업을 수행
- 그들과 합류
- 결과에 대한 일부 맵 및 필터 작업 수행
- 결과를 저장
그렇다면 몇 단계를 거쳐야할까요?
- 두 개의 데이터 소스를 병렬로로드하기위한 각각 1 단계 = 2 단계
-
다른 두 단계에
종속
되는
join
을 나타내는 세 번째 단계 - 참고 : 결합 된 데이터에 대한 모든 후속 작업은 순차적으로 수행되어야하므로 동일한 단계에서 수행 될 수 있습니다. 추가 작업을 시작하면 이전 작업이 완료 될 때까지 작업을 시작할 수 없으므로 추가 단계를 시작하면 이점이 없습니다.
그 장난감 프로그램입니다
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")
그리고 여기에 결과의 DAG가 있습니다
이제 얼마나 많은 작업이 있습니까? 작업 수는 다음과 같아야합니다
#Partitions in the stage
(
Stage
*
#Partitions in the stage
)