scala - Auslöser: java.lang.NullPointerException at org.apache.spark.sql.Dataset




apache-spark dataframe (2)

Unten gebe ich meinen Code an. Ich durchlaufe die DataFrame- prodRows und prodRows für jedes product_PK eine passende Unterliste von product_PKs aus prodRows .

  numRecProducts = 10
  var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
  prodRows.foreach{ row : Row =>
      val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
      val gender = row.get(row.fieldIndex("gender_PK")).toString
      val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
      var productList: Array[(Long, Int)] = Array()
      if (!selection.rdd.isEmpty()) {
        productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
      }
    listOfProducts = listOfProducts + (product_PK -> productList)
  }

Aber wenn ich es ausführe, gibt es mir den folgenden Fehler. In einigen Iterationen scheint die selection leer zu sein. Ich verstehe jedoch nicht, wie ich mit diesem Fehler umgehen kann:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
    at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
    at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
    at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
    at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Was bedeutet es und wie kann ich damit umgehen?


Das Problem ist, dass Sie versuchen, über prodRows auf prodRows prodRows.foreach . Sie können einen Datenrahmen nicht innerhalb einer Umwandlung verwenden. Datenrahmen sind nur auf dem Treiber vorhanden.


Sie können innerhalb einer Funktion, die an eine der DataFrame / RDD-Transformationen von Spark übergeben wird, nicht auf "treiberseitige" Abstraktionen (RDDs, DataFrames, Datasets, SparkSession ...) von Spark zugreifen. Innerhalb dieser Funktionen können Sie auch keine fahrerseitigen veränderbaren Objekte aktualisieren.

In Ihrem Fall versuchen Sie, prodRows und selection (beide sind DataFrames) in einer an DataFrame.foreach Funktion zu DataFrame.foreach . Sie versuchen auch, listOfProducts (eine lokale listOfProducts Variable) über dieselbe Funktion zu aktualisieren .

Warum?

  • DataFrames, RDDs und SparkSession sind nur in Ihrer Treiberanwendung vorhanden. Sie dienen als "Handle" für den Zugriff auf Daten, die über den Cluster von Arbeitscomputern verteilt sind.
  • An RDD / DataFrame-Transformationen übergebene Funktionen werden serialisiert und an diesen Cluster gesendet, um auf den Datenpartitionen auf jedem der Worker-Computer ausgeführt zu werden. Wenn die serialisierten DataFrames / RDDs auf diesen Computern deserialisiert werden - sie sind unbrauchbar und können die Daten im Cluster nicht darstellen, da es sich lediglich um leere Kopien der in der Treiberanwendung erstellten handelt, die tatsächlich eine Verbindung zum Cluster aufrechterhält maschinen
  • Aus demselben Grund schlägt der Versuch fehl, fahrerseitige Variablen zu aktualisieren: Die Variablen (die in den meisten Fällen als leer beginnen) werden serialisiert, für jeden Worker deserialisiert, werden lokal auf den Workern aktualisiert und bleiben dort. Die ursprüngliche fahrerseitige Variable bleibt unverändert

Wie können Sie das lösen? Wenn Sie mit Spark arbeiten, insbesondere mit DataFrames, sollten Sie versuchen, eine "Iteration" der Daten zu vermeiden, und stattdessen deklarative Operationen von DataFrame verwenden. Wenn Sie auf Daten eines anderen DataFrames für jeden Datensatz in Ihrem DataFrame verweisen möchten, möchten Sie in den meisten Fällen einen Join verwenden, um einen neuen DataFrame mit Datensätzen zu erstellen, in denen Daten aus den beiden DataFrames kombiniert werden.

In diesem speziellen Fall ist hier eine ungefähr gleichwertige Lösung, die das tut, was Sie versuchen, wenn ich es richtig abschließen konnte. Versuchen Sie, dies zu verwenden, und lesen Sie die DataFrame-Dokumentation, um die Details herauszufinden:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val numRecProducts = 10

val result = prodRows.as("left")
  // self-join by gender:
  .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
  // limit to 10 results per record:
  .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
  .filter($"rn" <= numRecProducts).drop($"rn")
  // group and collect_list to create products column:
  .groupBy($"left.product_PK" as "product_PK")
  .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")