scala - 차이 - spark 구조




spark에서 각 executor에 대해 한 번씩 작업을 수행하는 방법 (2)

나는 약 400MB 크기의 S3에 저장된 weka 모델을 가지고있다. 이제 모델을 실행하고 예측을 수행 할 레코드 집합이 있습니다.

예측을 수행하기 위해 내가 시도한 것은,

  1. 드라이버에서 모델을 다운로드하여 정적 객체로로드하고 모든 실행 프로그램에 브로드 캐스트합니다. 예측 RDD에서 맵 작업을 수행하십시오. ----> 예측을 수행하기 위해 Weka 에서처럼 작동하지 않습니다. 모델 객체를 수정하고 브로드 캐스팅하려면 읽기 전용 복사본이 필요합니다.

  2. 드라이버에서 모델을 다운로드하여 정적 객체로로드하고 각 맵 작업에서 실행 프로그램에 보냅니다. -----> Working (각지도 작업에서와 같이 효율적이지 않음, 400MB 객체를 전달 중임)

  3. 드라이버에서 모델을 다운로드하고 각 실행 프로그램에로드하고 캐시하십시오. (어떻게 해야할지 모르겠다)

누군가가 어떻게 각 Executor에서 모델을 한 번로드하고 캐시하여 다른 레코드에 대해 다시로드하지 않도록 할 수 있습니까?


게으른 초기화 프로그램보다 나에게 더 효과적이었던 것이있다. NULL로 초기화 된 오브젝트 레벨 포인터를 작성하고 각 실행 프로그램이 초기화하도록하십시오. 초기화 블록에서 한 번 실행 코드를 사용할 수 있습니다. 각 처리 배치는 로컬 변수를 재설정하지만 객체 수준의 변수는 재설정하지 않습니다.

object Thing1 {
  var bigObject : BigObject = null

  def main(args: Array[String]) : Unit = {
    val sc = <spark/scala magic here>
    sc.textFile(infile).map(line => {
      if (bigObject == null) {
         // this takes a minute but runs just once
         bigObject = new BigObject(parameters)  
      }
      bigObject.transform(line)
    })
  }
}

이 접근법은 다른 접근법의 파티션 당 하나의 큰 객체가 아니라 Executor 당 하나의 큰 객체를 생성합니다.

big bigObject : BigObject = null 을 주 함수 네임 스페이스 내에 넣으면 다르게 동작합니다. 이 경우, 각 파티션의 시작 부분 (즉 배치)에서 bigObject 생성자를 실행합니다. 메모리 누수가 발생하면 결국은 집행자를 죽일 것입니다. 가비지 수집은 또한 더 많은 작업을 수행해야합니다.


두 가지 옵션이 있습니다.

1. 데이터를 나타내는 lazy val로 싱글 톤 객체를 만듭니다.

    object WekaModel {
        lazy val data = {
            // initialize data here. This will only happen once per JVM process
        }
    }       

그런 다음 map 함수에서 lazy val을 사용할 수 있습니다. lazy val 은 각 작업자 JVM이 자신의 데이터 인스턴스를 초기화하도록합니다. data 대해 직렬화 또는 브로드 캐스트가 수행되지 않습니다.

    elementsRDD.map { element =>
        // use WekaModel.data here
    }

장점

  • JVM 인스턴스 당 한 번 데이터를 초기화 할 수 있기 때문에보다 효율적입니다. 이 접근법은 예를 들어 데이터베이스 연결 풀을 초기화해야 할 때 좋은 선택입니다.

단점

  • 초기화에 대한 제어가 적습니다. 예를 들어 런타임 매개 변수가 필요한 경우 객체를 초기화하는 것이 까다로울 수 있습니다.
  • 필요한 경우 실제로 해제하거나 해제 할 수 없습니다. 프로세스가 종료 될 때 OS가 리소스를 확보하기 때문에 대개 허용됩니다.

2. map 대신 RDD에서 mapPartition (또는 foreachPartition ) 메소드를 사용하십시오.

이렇게하면 전체 파티션에 필요한 모든 것을 초기화 할 수 있습니다.

    elementsRDD.mapPartition { elements =>
        val model = new WekaModel()

        elements.map { element =>
            // use model and element. there is a single instance of model per partition.
        }
    }

장점 :

  • 객체의 초기화 및 초기화 해제시 더 많은 유연성을 제공합니다.

단점

  • 각 파티션은 개체의 새 인스턴스를 만들고 초기화합니다. JVM 인스턴스 당 몇 개의 파티션이 있는지에 따라 문제가 될 수도 있고 아닐 수도 있습니다.




partitioning