apache-spark - write - using org apache spark sql jdbc




Wie kann die Partitionierung bei der Migration von Daten aus JDBC-Quellen optimiert werden? (2)

Ich versuche, Daten aus einer Tabelle in der PostgreSQL-Tabelle in eine Hive-Tabelle in HDFS zu verschieben. Dazu habe ich mir folgenden Code ausgedacht:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

Die Daten werden basierend auf prtn_String_columns: source_system_name, period_year, period_num dynamisch partitioniert in die Hive-Tabelle prtn_String_columns: source_system_name, period_year, period_num

Spark-Submit verwendet:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

Die folgenden Fehlermeldungen werden in den Executor-Protokollen generiert:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

Ich sehe in den Protokollen, dass der Lesevorgang mit der angegebenen Anzahl von Partitionen wie folgt ordnungsgemäß ausgeführt wird:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

Nachfolgend sehen Sie den Status der Testamentsvollstrecker in Stufen:

Die Daten werden nicht richtig partitioniert. Eine Partition ist kleiner, während die andere sehr groß wird. Hier liegt ein Schräglaufproblem vor. Beim Einfügen der Daten in die Hive-Tabelle schlägt der Job in der folgenden Zeile fehl: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") aber ich verstehe, dass dies aufgrund des Datenversatzes geschieht Problem.

Ich habe versucht, die Anzahl der Executoren zu erhöhen, den Executor-Speicher und den Treiberspeicher zu vergrößern. Ich habe versucht, den Datenframe nur als CSV-Datei zu speichern, anstatt ihn in einer Hive-Tabelle zu speichern.

java.lang.OutOfMemoryError: GC overhead limit exceeded

Enthält der Code irgendetwas, das ich korrigieren muss? Kann mir jemand mitteilen, wie ich dieses Problem beheben kann?


  1. Bestimmen Sie, wie viele Partitionen Sie angesichts der Menge der Eingabedaten und Ihrer Clusterressourcen benötigen. Als Faustregel gilt, dass Partitionseingaben unter 1 GB bleiben sollten, sofern dies nicht unbedingt erforderlich ist. und streng kleiner als die Blockgrößenbegrenzung.

    Sie haben zuvor angegeben, dass die Migration von 1 TB Datenwerten, die Sie in verschiedenen Posts (5 - 70) verwenden, wahrscheinlich zu niedrig ist, um einen reibungslosen Prozess zu gewährleisten.

    Versuchen Sie, einen Wert zu verwenden, der keine weitere repartitioning erfordert.

  2. Kennen Sie Ihre Daten.

    Analysieren Sie die im Dataset verfügbaren Spalten, um festzustellen, ob Spalten mit hoher Kardinalität und gleichmäßiger Verteilung auf die gewünschte Anzahl von Partitionen verteilt werden sollen. Dies sind gute Kandidaten für einen Importprozess. Zusätzlich sollten Sie einen genauen Wertebereich festlegen.

    Aggregationen mit unterschiedlichen Zentralitäts- und Skewness-Maßen sowie Histogramme und Basis-Count-by-Keys sind gute Explorationswerkzeuge. Für diesen Teil ist es besser, Daten direkt in der Datenbank zu analysieren, anstatt sie an Spark abzurufen.

    Je nach RDBMS können Sie width_bucket (PostgreSQL, Oracle) oder eine gleichwertige Funktion verwenden, um eine anständige Vorstellung davon zu erhalten, wie Daten in Spark nach dem Laden mit partitionColumn , lowerBound , upperBound , numPartitons .

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
  3. Wenn es keine Spalten gibt, die die oben genannten Kriterien erfüllen, berücksichtigen Sie Folgendes:

    • Erstellen Sie ein benutzerdefiniertes und machen Sie es über verfügbar. eine Sicht. Hashes über mehrere unabhängige Spalten sind normalerweise gute Kandidaten. DBMS_CRYPTO zu den Funktionen, die hier verwendet werden können, finden Sie in Ihrem Datenbankhandbuch ( DBMS_CRYPTO in Oracle, pgcrypto in PostgreSQL) *.
    • Die Verwendung mehrerer unabhängiger Spalten zusammen ergibt eine ausreichende Kardinalität.

      Wenn Sie in eine partitionierte Hive-Tabelle schreiben möchten, sollten Sie optional die Hive-Partitionierungsspalten einbeziehen. Dies kann die Anzahl der später erzeugten Dateien begrenzen.

  4. Bereiten Sie Partitionierungsargumente vor

    • Wenn die in den vorherigen Schritten ausgewählte oder erstellte Spalte numerisch ist ( oder Datum / Zeitstempel in Spark> = 2.4 ), geben Sie sie direkt als partitionColumn und verwenden Sie die zuvor festgelegten Bereichswerte, um lowerBound und upperBound zu füllen.

      Wenn gebundene Werte nicht die Eigenschaften von Daten widerspiegeln ( min(col) für lowerBound , max(col) für upperBound ), kann dies zu einer erheblichen Datenverschiebung führen. Im schlimmsten Fall, wenn Grenzen den Datenbereich nicht abdecken, werden alle Datensätze von einem einzelnen Computer abgerufen, sodass es nicht besser als gar keine Partitionierung gibt.

    • Wenn die in den vorherigen Schritten ausgewählte Spalte kategorial ist oder aus einer Reihe von Spalten besteht, erstellen Sie eine Liste sich gegenseitig ausschließender Prädikate, die die Daten vollständig abdecken, in einer Form, die in einer SQL WHERE-Klausel verwendet werden kann.

      Wenn Sie beispielsweise eine Spalte A mit den Werten { a1 , a2 , a3 } und Spalte B mit den Werten { b1 , b2 , b3 } haben:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"

      Stellen Sie sicher, dass sich die Bedingungen nicht überschneiden und alle Kombinationen abgedeckt sind. Wenn diese Bedingungen nicht erfüllt sind, erhalten Sie Duplikate bzw. fehlende Datensätze.

      jdbc Daten als predicates an den jdbc Aufruf. Beachten Sie, dass die Anzahl der Partitionen genau der Anzahl der Vergleichselemente entspricht.

  5. Versetzen Sie die Datenbank in einen schreibgeschützten Modus (alle laufenden Schreibvorgänge können zu Dateninkonsistenzen führen. Wenn möglich, sollten Sie die Datenbank sperren, bevor Sie den gesamten Prozess starten, aber wenn dies in Ihrer Organisation nicht möglich ist).

  6. Wenn die Anzahl der Partitionen mit den gewünschten Ausgabeladedaten übereinstimmt, ohne sie neu zu repartition und sie direkt in die Senke abzulegen, können Sie, falls nicht, versuchen, nach den gleichen Regeln wie in Schritt 1 neu zu partitionieren.

  7. Wenn weiterhin Probleme auftreten, stellen Sie sicher, dass Sie die Spark-Speicher- und GC-Optionen ordnungsgemäß konfiguriert haben.

  8. Wenn keines der oben genannten Verfahren funktioniert:

    • Ziehen Sie in Betracht, Ihre Daten in einem Netzwerk COPY TO / Speicher mit Tools wie COPY TO verteilen und sie direkt von dort aus zu lesen.

      Beachten Sie, dass Sie für Standarddatenbank-Dienstprogramme normalerweise ein POSIX-kompatibles Dateisystem benötigen, sodass HDFS dies normalerweise nicht tut.

      Der Vorteil dieses Ansatzes besteht darin, dass Sie sich keine Gedanken über die Spalteneigenschaften machen müssen und keine Daten in einen schreibgeschützten Modus versetzt werden müssen, um die Konsistenz zu gewährleisten.

    • Verwenden dedizierter Tools für den Massentransfer wie Apache Sqoop und anschließendes Umformen von Daten.

* Verwenden Sie keine Pseudospalten - Pseudospalten in Spark JDBC .


Es gab eine andere Frage von Ihnen, die hier als Duplikat weitergeleitet wurde

 'How to avoid data skewing while reading huge datasets or tables into spark? 
  The data is not being partitioned properly. One partition is smaller while the 
  other one becomes huge on read.
  I observed that one of the partition has nearly 2million rows and 
  while inserting there is a skew in partition. '

Wenn das Problem darin besteht, mit Daten umzugehen, die nach dem Lesen in einem Datenframe partitioniert wurden, haben Sie versucht, den Wert für "numPartitions" zu erhöhen?

.option("numPartitions",50)

lowerBound, upperBound Formularbereichsschritte für generierte WHERE-Klauselausdrücke und num-Partitionen bestimmen die Anzahl der Teilungen.

Angenommen, eine Tabelle hat eine Spalten-ID (wir wählen diese als partitionColumn ). Der Wertebereich, den wir in der Tabelle für die Spalten- ID von 1 bis 1000 und wir möchten alle Datensätze durch Ausführen von select * from sometable , also gehen wir mit lowerbound = 1 & upperbound = 1000 und numpartition = 4

Dadurch wird ein Datenrahmen mit 4 Partitionen mit dem Ergebnis jeder Abfrage erstellt, indem SQL auf der Grundlage unseres Feeds erstellt wird (lowerbound = 1 & upperbound = 1000 and numpartition = 4)

select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750

Was ID(500,750) wenn die meisten Datensätze in unserer Tabelle in den ID(500,750) Bereich fallen ID(500,750) Das ist die Situation, in der Sie sich befinden.

Wenn wir die Anzahl der Partitionen erhöhen, geschieht die Aufteilung noch weiter und reduziert das Volumen der Datensätze in derselben Partition, aber dies ist keine gute Einstellung.

Anstatt die partitioncolumn anhand der von uns angegebenen Grenzen aufzuteilen, können Daten gleichmäßig aufgeteilt werden, wenn Sie die Aufteilung selbst vornehmen möchten. Sie müssen zu einer anderen JDBC-Methode (lowerbound,upperbound & numpartition) wir anstelle von (lowerbound,upperbound & numpartition) Prädikate direkt bereitstellen können.

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 

Link





partitioning