scala - Org.apache.spark.sql.Dataset এ java.lang. নালপয়েন্টার এক্সসেপশন এর কারণ:




apache-spark dataframe (2)

নীচে আমি আমার কোড সরবরাহ করি। আমি ডেটাফ্রেম prodRows পুনরাবৃত্তি prodRows এবং প্রতিটি product_PK জন্য product_PK আমি প্রোড্রো থেকে prodRows এর কিছু মিলের সাব-লিস্ট খুঁজে prodRows

  numRecProducts = 10
  var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
  prodRows.foreach{ row : Row =>
      val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
      val gender = row.get(row.fieldIndex("gender_PK")).toString
      val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
      var productList: Array[(Long, Int)] = Array()
      if (!selection.rdd.isEmpty()) {
        productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
      }
    listOfProducts = listOfProducts + (product_PK -> productList)
  }

তবে আমি যখন এটি কার্যকর করি তখন এটি আমাকে নীচের ত্রুটি দেয়। দেখে মনে হচ্ছে selection কয়েকটি পুনরাবৃত্তিতে খালি রয়েছে। তবে আমি বুঝতে পারি না কীভাবে আমি এই ত্রুটিটি পরিচালনা করতে পারি:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
    at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
    at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
    at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
    at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

এর অর্থ কী এবং আমি কীভাবে এটি পরিচালনা করতে পারি?


সমস্যাটি হ'ল আপনি prodRows মধ্যে থেকে prodRows অ্যাক্সেস করার চেষ্টা prodRows.foreach । আপনি কোনও রূপান্তরের মধ্যে ডেটাফ্রেম ব্যবহার করতে পারবেন না, কেবল ডেটাফ্রেমগুলি ড্রাইভারের মধ্যে থাকে।


স্পার্কের ডেটাফ্রেম / আরডিডি রূপান্তরগুলির একটিতে ফাংশনটির মধ্যে থেকে আপনি স্পার্কের কোনও "ড্রাইভার-সাইড" বিমূর্ততা (আরডিডি, ডেটা ফ্রেমস, ডেটাসেটস, স্পার্কসেশন ...) অ্যাক্সেস করতে পারবেন না। আপনি এই ফাংশনগুলির মধ্যে থেকে ড্রাইভার-পার্শ্বের পরিবর্তনযোগ্য বস্তুগুলিও আপডেট করতে পারবেন না।

আপনার ক্ষেত্রে - আপনি ডেটাফ্রেম.ফেরোচকে দেওয়া একটি ফাংশনের মধ্যে prodRows এবং selection (উভয় ডেটা ফ্রেম) ব্যবহার করার চেষ্টা করছেন। আপনি একই ফাংশনটির মধ্যে থেকে তালিকা listOfProducts (স্থানীয় ড্রাইভার-পার্শ্ব ভেরিয়েবল) আপডেট করার চেষ্টা করছেন।

কেন?

  • ডেটা ফ্রেমস, আরডিডি এবং স্পার্কসেশন কেবলমাত্র আপনার ড্রাইভার অ্যাপ্লিকেশনটিতে বিদ্যমান। তারা শ্রমিক মেশিনগুলির ক্লাস্টারে বিতরণ করা ডেটা অ্যাক্সেস করতে "হ্যান্ডেল" হিসাবে পরিবেশন করে।
  • আরডিডি / ডেটাফ্রেম ট্রান্সফর্মেশনগুলিতে প্রেরিত ক্রিয়াকলাপগুলি সিরিয়ালযুক্ত হয়ে যায় এবং সেই ক্লাস্টারে প্রেরণ করা হয়, প্রতিটি কর্মী মেশিনের ডেটা পার্টিশনে চালিত হতে পারে। সিরিয়ালযুক্ত ডেটাফ্রেমস / আরডিডিগুলি যখন সেই মেশিনগুলিতে ডিসরিয়ালাইজড হয়ে যায় - তারা অকেজো হয় তবে তারা এখনও ক্লাস্টারের ডেটা উপস্থাপন করতে পারে না কারণ তারা চালক অ্যাপ্লিকেশনটিতে তৈরি হওয়া কেবল ফাঁকা অনুলিপি থাকে, যা আসলে ক্লাস্টারের সাথে সংযোগ বজায় রাখে মেশিন
  • একই কারণে ড্রাইভার-পার্শ্বের ভেরিয়েবলগুলি আপডেট করার চেষ্টা ব্যর্থ হবে: ভেরিয়েবলগুলি (খালি হিসাবে শুরু করা, বেশিরভাগ ক্ষেত্রেই) প্রতিটি ক্রিয়াকলাপকে ডিজিটালাইজড করা হবে, কর্মীদের উপর স্থানীয়ভাবে আপডেট করা হবে এবং সেখানে থাকবে .. মূল ড্রাইভার সাইড ভেরিয়েবল অপরিবর্তিত থাকবে

আপনি কীভাবে এটি সমাধান করতে পারেন? স্পার্কের সাথে কাজ করার সময়, বিশেষত ডেটাফ্রেমগুলির সাথে, আপনার ডেটার উপর "পুনরাবৃত্তি" এড়াতে চেষ্টা করা উচিত এবং এর পরিবর্তে ডেটাফ্রেমের ঘোষণামূলক ক্রিয়াকলাপগুলি ব্যবহার করা উচিত। বেশিরভাগ ক্ষেত্রে, আপনি যখন আপনার ডেটাফ্রেমের প্রতিটি রেকর্ডের জন্য অন্য ডেটাফ্রেমের ডেটা উল্লেখ করতে চান, আপনি দুটি ডেটা ফ্রেম থেকে ডেটা সংযুক্ত করে রেকর্ড সহ একটি নতুন ডেটাফ্রেম তৈরি করতে join ব্যবহার করতে চাইবেন।

এই নির্দিষ্ট ক্ষেত্রে, এখানে প্রায় সমতুল্য সমাধান যা আপনি যা করার চেষ্টা করছেন তা করে, যদি আমি এটি সঠিকভাবে উপস্থাপন করতে পারি তবে। এটি ব্যবহার করার চেষ্টা করুন এবং বিশদটি বের করার জন্য ডেটা ফ্রেম ডকুমেন্টেশনটি পড়ুন:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val numRecProducts = 10

val result = prodRows.as("left")
  // self-join by gender:
  .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
  // limit to 10 results per record:
  .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
  .filter($"rn" <= numRecProducts).drop($"rn")
  // group and collect_list to create products column:
  .groupBy($"left.product_PK" as "product_PK")
  .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")