apache spark - Comment optimiser le partitionnement lors de la migration de données depuis une source JDBC?




apache-spark hive (2)

J'essaie de déplacer des données d'une table de la table PostgreSQL vers une table Hive sur HDFS. Pour ce faire, j'ai créé le code suivant:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

Les données sont insérées dans la table de ruche partitionnées dynamiquement en fonction de prtn_String_columns: source_system_name, period_year, period_num

Spark-submit utilisé:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

Les messages d'erreur suivants sont générés dans les journaux de l'exécuteur:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

Je vois dans les journaux que la lecture est correctement exécutée avec le nombre de partitions indiqué ci-dessous:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

Ci-dessous l'état des exécuteurs en étapes:

Les données ne sont pas partitionnées correctement. Une partition est plus petite alors que l'autre devient énorme. Il y a un problème d'asymétrie ici. Lors de l'insertion des données dans la table Hive, le travail échoue à la ligne suivante: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") problème.

J'ai essayé d'augmenter le nombre d'exécuteurs, en augmentant la mémoire de l'exécuteur, la mémoire du pilote, en essayant simplement de sauvegarder en tant que fichier csv au lieu d'enregistrer le cadre de données dans une table Hive, mais rien n'empêche l'exécution de donner l'exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded

Y at-il quelque chose dans le code que je dois corriger? Quelqu'un pourrait-il me faire savoir comment puis-je résoudre ce problème?


  1. Déterminez le nombre de partitions dont vous avez besoin en fonction de la quantité de données d'entrée et de vos ressources de cluster. En règle générale, il est préférable de conserver l’entrée de la partition sous 1 Go sauf si cela est strictement nécessaire. et strictement inférieur à la limite de taille de bloc.

    Vous avez précédemment déclaré que vous migrez 1 To de valeurs de données que vous utilisez dans différentes publications (5 - 70) sont probablement très faibles pour assurer un processus fluide.

    Essayez d’utiliser une valeur qui ne nécessite pas de repartitioning supplémentaire.

  2. Connaissez vos données.

    Analysez les colonnes disponibles dans le jeu de données pour déterminer si des colonnes à cardinalité élevée et à distribution uniforme doivent être réparties entre le nombre souhaité de partitions. Ce sont de bons candidats pour un processus d'importation. De plus, vous devez déterminer une plage exacte de valeurs.

    Les agrégations avec différentes mesures de centralité et d'asymétrie, ainsi que des histogrammes et des comptages de base par clé sont de bons outils d'exploration. Pour cette partie, il est préférable d’analyser les données directement dans la base de données, au lieu de les récupérer dans Spark.

    En fonction du SGBDR, vous pourrez peut-être utiliser width_bucket (PostgreSQL, Oracle) ou une fonction équivalente pour avoir une idée lowerBound répartition des données dans Spark après le chargement avec partitionColumn , lowerBound , upperBound , numPartitons .

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
  3. Si aucune colonne ne satisfait aux critères ci-dessus, prenez en compte:

    • Créer un personnalisé et l'exposer via. une vue. Les hachages sur plusieurs colonnes indépendantes sont généralement de bons candidats. Veuillez consulter le manuel de votre base de données pour déterminer les fonctions pouvant être utilisées ici ( DBMS_CRYPTO dans Oracle, pgcrypto dans PostgreSQL) *.
    • L'utilisation d'un ensemble de colonnes indépendantes qui, prises ensemble, fournissent une cardinalité assez élevée.

      Si vous souhaitez écrire sur une table Hive partitionnée, vous pouvez éventuellement inclure des colonnes de partitionnement Hive. Cela pourrait limiter le nombre de fichiers générés plus tard.

  4. Préparer les arguments de partitionnement

    • Si la colonne sélectionnée ou créée aux étapes précédentes est numérique ( ou date / horodatage dans Spark> = 2.4 ), indiquez-la directement en tant que partitionColumn et utilisez les valeurs de plage déterminées précédemment pour remplir lowerBound et upperBound .

      Si les valeurs liées ne reflètent pas les propriétés des données ( min(col) pour lowerBound , max(col) pour upperBound ), il peut en résulter un upperBound significatif des données. Dans le pire des cas, lorsque les limites ne couvrent pas la plage de données, tous les enregistrements sont récupérés par une seule machine, ce qui ne vaut pas mieux que pas de partitionnement du tout.

    • Si la colonne sélectionnée dans les étapes précédentes est catégorique ou si un ensemble de colonnes génère une liste de prédicats s'excluant mutuellement qui couvrent entièrement les données, sous une forme utilisable dans une clause where SQL .

      Par exemple, si vous avez une colonne A avec les valeurs { a1 , a2 , a3 } et la colonne B avec les valeurs { b1 , b2 , b3 }:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"

      Vérifiez que les conditions ne se chevauchent pas et que toutes les combinaisons sont couvertes. Si ces conditions ne sont pas remplies, vous obtenez des doublons ou des enregistrements manquants.

      Passer des données en tant qu'argument de predicates à l'appel jdbc . Notez que le nombre de partitions sera exactement égal au nombre de prédicats.

  5. Mettez la base de données en mode lecture seule (toute écriture en cours peut entraîner une incohérence dans les données. Si possible, verrouillez la base de données avant de démarrer tout le processus, mais si cela n’est pas possible, dans votre organisation).

  6. Si le nombre de partitions correspond aux données de charge de sortie souhaitées sans repartition et dump directement sur le récepteur, vous pouvez sinon essayer de repartitionner en suivant les mêmes règles qu'à l'étape 1.

  7. Si vous rencontrez toujours des problèmes, assurez-vous que vous avez correctement configuré la mémoire Spark et les options du CPG.

  8. Si aucune de ces solutions ne fonctionne:

    • Envisagez de transférer vos données sur un réseau / distribue le stockage à l'aide d'outils tels que COPY TO et lisez-le directement à partir de cet emplacement.

      Notez que pour les utilitaires de base de données standard, vous aurez généralement besoin d'un système de fichiers compatible POSIX. HDFS ne le fera généralement pas.

      L'avantage de cette approche est que vous n'avez pas à vous soucier des propriétés de la colonne et qu'il n'est pas nécessaire de mettre les données en mode lecture seule pour assurer la cohérence.

    • Utiliser des outils de transfert en bloc dédiés, tels qu'Apache Sqoop, puis remodeler les données.

* N'utilisez pas de pseudocolonnes - Pseudocolumn dans Spark JDBC .


D'après mon expérience, il existe 4 types de paramètres de mémoire qui font la différence:

A) [1] Mémoire pour stocker des données pour des raisons de traitement VS [2] Heap Space pour contenir la pile de programmes

B) [1] Pilote VS [2] mémoire exécuteur

Jusqu'à présent, j'ai toujours réussi à faire fonctionner mes travaux Spark en augmentant le type de mémoire approprié:

A2-B1 aurait donc la mémoire disponible sur le pilote pour contenir la pile de programmes. Etc.

Les noms de propriété sont les suivants:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

N'oubliez pas que la somme de tous les éléments * -B1 doit être inférieure à la mémoire disponible sur vos travailleurs et que la somme de tous les éléments * -B2 doit être inférieure à la mémoire de votre nœud de pilote.

Mon pari serait que le coupable est l’un des paramètres de tas hardiment marqués.





partitioning