scala - tutorial - 아파치 스파크




Spark 실행자의 객체 캐시 (2)

스파크 전문가에게 좋은 질문입니다.

map 작업 (RDD)에서 데이터를 처리 중입니다. mapper 함수 내에서 RDD의 요소 처리에 사용되는 클래스 A 객체를 조회해야합니다.

이것은 Executor에서 수행되고 A 유형의 요소 (조회 될 것입니다)의 작성은 비용이 많이 드는 작업이므로, 각 실행 프로그램에서이 오브젝트를 미리로드하고 캐시하고 싶습니다. 그것을하는 가장 좋은 방법은 무엇입니까?

  • 하나의 아이디어는 룩업 테이블을 방송하는 것이지만, 클래스 A 는 직렬화 가능하지 않다 (구현에 대한 제어가 없다).

  • 또 다른 아이디어는 싱글 톤 객체에로드하는 것입니다. 그러나, 나는 그 룩업 테이블에로드되는 것을 제어하려고합니다 (예를 들어 다른 Spark 작업의 다른 데이터).

이상적으로는, 스트리밍의 경우를 포함하여 실행 프로그램에서로드 될 내용을 한 번 지정하여 (시작 전 또는 이전에 드라이버에서 사용할 수있는 매개 변수를 통해 조회 테이블이 배치 사이의 메모리에 유지되도록) 데이터가 처리됩니다.

깨끗하고 품위 있고 우아한 방법이 있습니까? 아니면 달성하기가 불가능합니까?


이것은 정확히 broadcast. 으로 사용되는 사례입니다 broadcast. 방송 된 변수는 한 번 전송되고 급류를 사용하여 모든 집행자에게 효율적으로 이동하고 더 이상 필요하지 않을 때까지 메모리 / 로컬 디스크에 머 무르십시오.

직렬화는 다른 사람의 인터페이스를 사용할 때 종종 문제로 나타난다. 소비하는 객체가 직렬화 가능하다는 것을 강요 할 수 있다면, 이것이 최선의 해결책이 될 것입니다. 이것이 불가능하면 인생은 조금 더 복잡해집니다. A 오브젝트를 직렬화 할 수 없으면 각 태스크의 실행 프로그램에서 오브젝트를 작성해야합니다. 그들이 어딘가에 파일에 저장되어 있다면, 이것은 다음과 같이 보입니다 :

rdd.mapPartitions { it => 
  val lookupTable = loadLookupTable(path)
  it.map(elem => fn(lookupTable, elem))
}

이 모델을 사용하는 경우 작업마다 조회 테이블을 한 번로드해야하므로 브로드 캐스트 변수의 교차 작업 지속성을 활용할 수는 없습니다.

편집 : JVM 당 작업을 통해 조회 테이블을 공유 할 수 있다고 믿는 다른 모델이 있습니다.

class BroadcastableLookupTable {
  @transient val lookupTable: LookupTable[A] = null

  def get: LookupTable[A] = {
    if (lookupTable == null)
      lookupTable = < load lookup table from disk>
    lookupTable
  }
}

이 클래스는 브로드 캐스트 될 수 있고 (실질적인 것은 전송되지 않습니다) JVM 당 처음 호출되면 룩업 테이블을로드하여 리턴합니다.


직렬화가 불가능할 경우, 조회 객체를 데이터베이스에 저장하는 방법은 무엇입니까? 그것은 가장 쉬운 해결책은 아니지만, 잘 작동해야합니다. eg spark-redis 확인하는 것이 좋습니다. 그러나 거기에는 더 나은 해결책이 있다고 확신합니다.





apache-spark