apache spark - জেডিবিসি উত্স থেকে ডেটা স্থানান্তর করার সময় পার্টিশনটি কীভাবে অনুকূল করা যায়?




apache-spark jdbc (2)

  1. ইনপুট ডেটার পরিমাণ এবং আপনার ক্লাস্টারের সংস্থান প্রদত্ত আপনাকে কতগুলি পার্টিশন প্রয়োজন তা নির্ধারণ করুন। থাম্বের নিয়ম হিসাবে কঠোরভাবে প্রয়োজন না হলে পার্টিশন ইনপুটটি 1GB এর অধীনে রাখা ভাল। এবং ব্লক আকারের সীমা চেয়ে কড়াভাবে ছোট।

    আপনি পূর্বে বলেছেন যে আপনি 1TB ডেটা মানগুলি বিভিন্ন পোস্টে ব্যবহার করেন (5 - 70) সাবলীল প্রক্রিয়াটি নিশ্চিত করার পক্ষে কম উপায়।

    এমন মানটি ব্যবহার করার চেষ্টা করুন যার জন্য আরও repartitioning প্রয়োজন হবে না।

  2. আপনার তথ্য জানুন।

    পছন্দসই সংখ্যক পার্টিশনের মধ্যে বিতরণ করার জন্য উচ্চ কার্ডিনালিটি এবং অভিন্ন বিতরণ সহ কোনও কলাম রয়েছে কিনা তা নির্ধারণ করতে ডেটাসেটে উপলব্ধ কলামগুলি বিশ্লেষণ করুন। এগুলি আমদানি প্রক্রিয়াটির জন্য ভাল প্রার্থী। অতিরিক্তভাবে আপনার মানগুলির একটি সঠিক পরিসর নির্ধারণ করা উচিত।

    বিভিন্ন কেন্দ্রীভূততা এবং স্কিউনেস পরিমাপের পাশাপাশি হিস্টোগ্রাম এবং বেসিক গণনা দ্বারা কী একত্রিত করা ভাল অনুসন্ধানের সরঞ্জাম। এই অংশটির জন্য স্পার্কে আনার পরিবর্তে সরাসরি ডাটাবেসে ডেটা বিশ্লেষণ করা ভাল।

    width_bucket উপর নির্ভর করে আপনি partitionColumn lowerBound , upperBound , numPartitons , নাম upperBound লোড করার পরে স্পার্কে কীভাবে ডেটা বিতরণ করা হবে তার একটি শালীন ধারণা পেতে আপনি width_bucket (পোস্টগ্রিএসকিউএল, ওরাকল) বা সমমানের ফাংশন ব্যবহার করতে সক্ষম হতে পারেন।

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
  3. যদি কোনও কলাম না থাকে যা উপরের মানদণ্ডগুলিকে সন্তুষ্ট করে:

    • একটি কাস্টম তৈরি করা এবং এটির মাধ্যমে প্রকাশ করা। একটি দৃশ্য. একাধিক স্বতন্ত্র কলামের উপর হ্যাশগুলি সাধারণত ভাল প্রার্থী হয়। এখানে ব্যবহার করা যেতে পারে এমন ফাংশনগুলি নির্ধারণ করতে আপনার ডাটাবেস ম্যানুয়ালটি পরামর্শ করুন ( DBMS_CRYPTO pgcrypto , পোস্টগ্রাইএসকিউএলে pgcrypto ) *।
    • একসাথে নেওয়া স্বতন্ত্র কলামগুলির একটি সেট ব্যবহার করে উচ্চ পর্যাপ্ত কার্ডিনালিটি সরবরাহ করে।

      Allyচ্ছিকরূপে, আপনি যদি কোনও পার্টিশনযুক্ত এইচআইভি টেবিলটিতে লিখতে চলেছেন তবে আপনার মধুচী বিভাজন কলামগুলি সহ বিবেচনা করা উচিত। এটি পরে উত্পন্ন ফাইলের সংখ্যা সীমাবদ্ধ করতে পারে।

  4. বিভাজন যুক্তি প্রস্তুত

    • পূর্ববর্তী পদক্ষেপে নির্বাচিত বা তৈরি করা কলামটি যদি সংখ্যাসূচক হয় ( বা স্পার্কে তারিখ / টাইমস্ট্যাম্প> = 2.4 ) এটি সরাসরি partitionColumn হিসাবে সরবরাহ করে এবং lowerBound এবং upperBound পূরণ করার আগে নির্ধারিত পরিসীমা মানগুলি ব্যবহার করে।

      যদি সীমাবদ্ধ মানগুলি lowerBound জন্য ডেটা ( min(col) , lowerBound জন্য max(col) min(col) এর বৈশিষ্ট্যগুলি প্রতিফলিত না করে তবে এটি সাবধানতার সাথে থ্রেডের ফলে গুরুত্বপূর্ণ ডেটা upperBound পারে। সবচেয়ে খারাপ পরিস্থিতিতে, যখন সীমাগুলি তথ্যের ব্যাপ্তিটি কভার করে না, সমস্ত রেকর্ডগুলি একটি একক মেশিন দ্বারা এনে দেওয়া হবে, এটি মোটেও পার্টিশন না করার চেয়ে ভাল করে তোলে।

    • পূর্ববর্তী পদক্ষেপে নির্বাচিত কলামটি শ্রেণিবদ্ধ হয় বা কলামগুলির একটি সেট পারস্পরিক একচেটিয়া পূর্বাভাসগুলির একটি তালিকা উত্পন্ন করে যা সম্পূর্ণরূপে ডেটা কভার করে এমন একটি ফর্মে যেখানে একটি SQL যেখানে ধারা ব্যবহার করা যেতে পারে in

      উদাহরণস্বরূপ, যদি আপনার মানগুলির সাথে কলাম A a1 , a2 , a3 } এবং কলাম B রয়েছে { b1 , b2 , b3 }:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"

      দু'বার চেক করুন যে শর্তগুলি ওভারল্যাপ করে না এবং সমস্ত সংমিশ্রণ coveredাকা রয়েছে। যদি এই শর্তগুলি সন্তুষ্ট না হয় তবে আপনি যথাক্রমে সদৃশ বা নথিভুক্ত রেকর্ডগুলি সহ শেষ করবেন।

      jdbc কলটিতে যুক্তির predicates হিসাবে ডেটা পাস করুন। নোট করুন যে পার্টিশনের সংখ্যা পূর্বাভাসের সংখ্যার সমান হবে।

  5. পঠনযোগ্য মোডে ডাটাবেস রাখুন (চলমান লেখাগুলি ডেটার অসঙ্গতি সৃষ্টি করতে পারে। সম্ভব হলে পুরো প্রক্রিয়াটি শুরু করার আগে আপনার ডাটাবেসটি লক করা উচিত, তবে যদি সম্ভব না হয় তবে আপনার প্রতিষ্ঠানে) in

  6. পার্টিশনের সংখ্যা যদি পুনরায় বিভাজন ছাড়াই পছন্দসই আউটপুট লোড ডেটার সাথে মিলে যায় এবং সরাসরি সিঙ্কে ফেলে দেয়, না হলে আপনি ধাপ 1-এর মতো একই বিধি অনুসরণ করে পুনরায় ভাগ করার চেষ্টা করতে পারেন।

  7. আপনি যদি এখনও কোনও সমস্যার সম্মুখীন হন তা নিশ্চিত করুন যে আপনি স্পার্ক মেমরি এবং জিসি বিকল্পগুলি সঠিকভাবে কনফিগার করেছেন।

  8. উপরের কোনওটি যদি কাজ না করে:

    • আপনার ডেটাটিকে কোনও নেটওয়ার্কে ফেলে দেওয়ার / COPY TO মতো সরঞ্জামগুলি ব্যবহার করে স্টোরেজ বিতরণ করার বিষয়টি বিবেচনা করুন এবং সেখান থেকে সরাসরি পড়ুন।

      নোট করুন বা স্ট্যান্ডার্ড ডাটাবেস ইউটিলিটিগুলির জন্য আপনার সাধারণত একটি পসিক্স কমপ্লায়েন্ট ফাইল সিস্টেমের প্রয়োজন হবে, তাই এইচডিএফএস সাধারণত না করে।

      এই পদ্ধতির সুবিধাটি হ'ল আপনার কলামের বৈশিষ্ট্যগুলি নিয়ে চিন্তা করার দরকার নেই এবং ধারাবাহিকতা নিশ্চিত করার জন্য কেবল পঠনযোগ্য মোডে ডেটা রাখার দরকার নেই।

    • অ্যাপাচি স্কুওপের মতো ডেডিকেটেড বাল্ক ট্রান্সফার সরঞ্জামগুলি ব্যবহার করা এবং এরপরে ডেটা পুনরায় আকার দেওয়া।

* সিউডোকলমগুলি ব্যবহার করবেন না - স্পার্ক জেডিবিসি-তে সিউডোকলন ।

আমি পোস্টগ্রেএসকিউএল টেবিলের একটি টেবিল থেকে এইচডিএফএসের একটি মাতালকের টেবিলে ডেটা স্থানান্তরিত করার চেষ্টা করছি। এটি করতে, আমি নিম্নলিখিত কোডটি নিয়ে এসেছি:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

ডেটা prtn_String_columns: source_system_name, period_year, period_num টেবিলের মধ্যে prtn_String_columns: source_system_name, period_year, period_num

স্পার্ক-জমা ব্যবহৃত:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

নির্বাহী লগগুলিতে নিম্নলিখিত ত্রুটি বার্তাগুলি উত্পন্ন হয়:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

আমি লগগুলিতে দেখতে পাচ্ছি যে নীচের হিসাবে প্রদত্ত পার্টিশনের সংখ্যাটি সহ সঠিকভাবে পঠন করা হচ্ছে:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

নীচে পর্যায়ক্রমে নির্বাহকদের অবস্থা:

তথ্যটি সঠিকভাবে বিভাজন করা হচ্ছে না। একটি পার্টিশন ছোট যখন অন্যটি বিশাল হয়। এখানে একটি স্কিউ সমস্যা আছে। হাইভ টেবিলে ডেটা data spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") সময় লাইনটিতে কাজটি ব্যর্থ হয়: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") তবে আমি বুঝতে পারি ডেটা স্কুয়ের কারণে এটি ঘটছে সমস্যা।

আমি এক্সিকিউটারের সংখ্যা বাড়ানোর চেষ্টা করেছি, এক্সিকিউটারের মেমোরি বাড়িয়েছি, ড্রাইভার মেমোরি করেছি, কেবলমাত্র একটি সিভির ফাইল হিসাবে সংরক্ষণ করার চেষ্টা করেছি ডেভেলফ্রেমকে এইচআইভির টেবিলে সংরক্ষণ করার পরিবর্তে কোনও কিছুই ব্যতিক্রমী হওয়া থেকে মৃত্যুদন্ড কার্যকর করে না:

java.lang.OutOfMemoryError: GC overhead limit exceeded

কোডটিতে এমন কিছু আছে যা আমার সংশোধন করা দরকার? কেউ কি আমাকে জানাতে পারেন আমি কীভাবে এই সমস্যাটি সমাধান করতে পারি?


আমার অভিজ্ঞতায় 4 ধরণের মেমরি সেটিংস রয়েছে যা একটি পার্থক্য করে:

ক) [১] প্রক্রিয়াকরণের কারণে ডেটা সংরক্ষণের জন্য মেমরি ভিএস [২] প্রোগ্রাম স্ট্যাক ধরে রাখার জন্য হ্যাপ স্পেস

খ) [1] ড্রাইভার ভিএস [2] এক্সিকিউটার মেমরি

এখন অবধি, আমি যথাযথ ধরণের স্মৃতিশক্তি বাড়িয়ে আমার স্পার্ক কাজগুলি সফলভাবে চালাতে সক্ষম হয়েছি:

A2-B1 এর পরে প্রোগ্রামটি স্ট্যাকটি ধরে রাখতে ড্রাইভারের কাছে স্মৃতি উপলব্ধ রয়েছে। প্রভৃতি

সম্পত্তির নাম নিম্নরূপ:

এ 1-বি 1) executor-memory

এ 1-বি 2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

মনে রাখবেন যে সমস্ত * -B1 এর যোগফল আপনার কর্মীদের উপলব্ধ মেমরির চেয়ে কম এবং সমস্ত * -B2 এর যোগফল আপনার ড্রাইভার নোডের মেমরির চেয়ে কম হতে হবে।

আমার বাজিটি হ'ল, অপরাধী হ'ল সাহসীভাবে চিহ্নিত করা হ্যাপ সেটিংসগুলির মধ্যে একটি।







partitioning