apache-spark - read - spark to dataframe




JDBC 소스에서 데이터를 마이그레이션 할 때 파티셔닝을 최적화하는 방법은 무엇입니까? (2)

PostgreSQL 테이블의 테이블에서 HDFS의 하이브 테이블로 데이터를 이동하려고합니다. 이를 위해 다음 코드를 작성했습니다.

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

데이터는 prtn_String_columns: source_system_name, period_year, period_num 따라 동적으로 분할 된 하이브 테이블에 삽입됩니다 prtn_String_columns: source_system_name, period_year, period_num

사용 된 스파크 제출 :

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

실행자 로그에 다음과 같은 오류 메시지가 생성됩니다.

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

로그에 다음과 같이 지정된 개수의 파티션으로 읽기가 제대로 실행되고 있음을 알 수 있습니다.

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

다음은 유언 집행자들의 상태입니다.

데이터가 올바르게 분할되지 않았습니다. 하나의 파티션은 작아지고 다른 파티션은 거대 해집니다. 여기에 왜곡 문제가 있습니다. 데이터를 하이브 테이블에 삽입하는 동안 spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") 작업이 실패하지만 데이터 비뚤어 짐 때문에이 문제가 발생하는 것으로 알고 있습니다. 문제.

나는 executor의 수를 늘리려고 executor memory, driver memory를 증가 시키려고 시도했지만, 단지 하이브 테이블에 dataframe을 저장하는 대신 csv 파일로 저장하려고 시도했지만 예외는 예외적으로 실행에 영향을주지 않습니다.

java.lang.OutOfMemoryError: GC overhead limit exceeded

코드에서 수정해야 할 것이 있습니까? 아무도 내가이 문제를 어떻게 해결할 수 있는지 알려줄 수 있습니까?


  1. 입력 데이터의 양과 클러스터 리소스를 고려하여 필요한 파티션 수를 결정하십시오. 일반적으로 엄밀히 말하면 파티션 입력을 1GB 미만으로 유지하는 것이 좋습니다. 블록 크기 제한보다 엄격하게 작습니다.

    이전 에 다른 게시물 (5 - 70)에서 사용하는 1TB의 데이터 값을 마이그레이션하면 원활하게 처리 할 수있는 가능성이 낮습니다.

    추가 repartitioning 필요로하지 않는 값을 사용하십시오.

  2. 귀하의 데이터를 아십시오.

    데이터 세트에서 사용할 수있는 열을 분석하여 카디널리티가 높고 균일 한 분포를 가진 열이 원하는 수의 파티션에 분산되도록 결정합니다. 이들은 수입 과정의 좋은 후보자입니다. 또한 정확한 값 범위를 결정해야합니다.

    다른 중심성과 왜도 측정뿐만 아니라 히스토그램과 기본 계수는 집계가 좋은 탐색 도구입니다. 이 부분에서는 Spark에 가져 오는 대신 데이터베이스에서 직접 데이터를 분석하는 것이 좋습니다.

    RDBMS에 따라 partitionColumn , lowerBound , upperBound , numPartitons 하여로드 한 후 Spark에서 데이터가 분산되는 방법에 대한 자세한 정보를 얻으려면 width_bucket (PostgreSQL, Oracle) 또는 이와 동등한 함수를 사용할 수 있습니다.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
  3. 위의 기준을 만족하는 열이 없다면 다음을 고려하십시오.

    • 커스텀 원 작성 및 노출. 관점. 일반적으로 여러 개의 독립적 인 열에 대한 해시가 적합합니다. 여기에서 사용할 수있는 기능을 결정하려면 데이터베이스 설명서를 참조하십시오 (Oracle의 DBMS_CRYPTO , PostgreSQL의 pgcrypto ) *.
    • 함께 사용 된 독립적 인 열 집합을 사용하여 충분히 높은 카디널리티를 제공합니다.

      선택적으로 파티션 된 하이브 테이블에 쓰려면 하이브 파티션 컬럼을 포함시켜야합니다. 나중에 생성되는 파일 수를 제한 할 수 있습니다.

  4. 파티셔닝 인수 준비하기

    • 이전 단계에서 선택하거나 만든 열이 숫자 ( 또는 Spark> = 2.4의 날짜 / 시간 소인 ) 인 경우 partitionColumn 직접 제공하고 lowerBoundupperBound 를 채우기 전에 결정된 범위 값을 사용하십시오.

      바운드 값이 데이터의 특성 ( lowerBound 경우 max(col) , upperBound 경우 max(col) 을 반영하지 않으면 스레드가 신중하게 처리되므로 중요한 데이터가 왜곡 될 수 있습니다. 최악의 시나리오에서는 경계가 데이터 범위를 다루지 않을 때 모든 레코드가 단일 시스템에서 페치 (fetch)되므로 전혀 분할하지 않는 것이 낫습니다.

    • 이전 단계에서 선택된 C 럼이 + 유형이거나 C 럼 세트가 SQL where 절에서 g 용할 수있는 형식으로 데이터를 완전히 포함하는 상호 h 타적 술어리스트를 생성 할 경우.

      예를 들어, 값이 { a1 , a2 , a3 } 인 열 B 와 값이 { b1 , b2 , b3 } 인 열 B 있는 경우 :

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"

      조건이 겹치지 않고 모든 조합이 적용되는지 다시 확인하십시오. 이러한 조건이 충족되지 않으면 중복되거나 누락 된 레코드로 끝납니다.

      데이터를 predicates 인수로 jdbc 호출에 전달하십시오. 파티션의 수는 술어의 수와 정확히 같을 것입니다.

  5. 데이터베이스를 읽기 전용 모드로 설정하십시오 (진행중인 쓰기는 데이터 불일치를 유발할 수 있습니다. 가능하면 전체 프로세스를 시작하기 전에 데이터베이스를 잠 가야하지만 가능하지 않은 경우 조직에서).

  6. 파티션 수가 원하는 출력로드 데이터와 일치하는 경우 repartition 하지 않고 싱크에 직접 덤프하십시오. 그렇지 않으면 1 단계에서와 동일한 규칙에 따라 다시 분할을 시도 할 수 있습니다.

  7. 그래도 문제가 발생하면 스파크 메모리 및 GC 옵션을 올바르게 구성했는지 확인하십시오.

  8. 위 작업 중 어느 것도 작동하지 않는 경우 :

    • 데이터를 네트워크에 덤프하거나 COPY TO 와 같은 도구를 사용하여 저장소를 배포하고 거기에서 직접 읽는 것이 좋습니다.

      POSIX 호환 파일 시스템이 필요하기 때문에 일반적으로 HDFS는 그렇게하지 않을 것입니다.

      이 접근법의 장점은 열 속성에 대해 걱정할 필요가 없으며 데이터를 읽기 전용 모드로 두어 일관성을 유지할 필요가 없다는 것입니다.

    • Apache Sqoop과 같은 전용 대량 전송 도구를 사용하고 나중에 데이터를 다시 형상화합니다.

Spark JDBC에서 pseudocolumns - Pseudocolumn을 사용 하지 마십시오 .


내 경험에는 차이를 만드는 4 가지 메모리 설정이 있습니다.

A) [1] 처리를 위해 데이터를 저장하는 메모리 VS [2] 프로그램 스택을 보유하기위한 힙 공간

B) [1] 드라이버 VS [2] 실행 메모리

지금까지 필자는 항상 적절한 종류의 메모리를 늘려 Spark 작업을 성공적으로 실행할 수있었습니다.

A2-B1은 드라이버가 프로그램 스택을 유지할 수 있도록 메모리를 사용할 수있게합니다. 기타.

속성 이름은 다음과 같습니다.

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

모든 * -B1의 합계는 작업자의 사용 가능한 메모리보다 작아야하며 모든 * -B2의 합이 드라이버 노드의 메모리보다 작아야합니다.

내 범인은 범인이 대담하게 표시된 힙 설정 중 하나라는 것입니다.





partitioning