scala - Auslöser: java.lang.NullPointerException at org.apache.spark.sql.Dataset
apache-spark dataframe (2)
Unten gebe ich meinen Code an.
Ich durchlaufe die DataFrame-
prodRows
und
prodRows
für jedes
product_PK
eine passende Unterliste von product_PKs aus
prodRows
.
numRecProducts = 10
var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
prodRows.foreach{ row : Row =>
val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
val gender = row.get(row.fieldIndex("gender_PK")).toString
val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
var productList: Array[(Long, Int)] = Array()
if (!selection.rdd.isEmpty()) {
productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
}
listOfProducts = listOfProducts + (product_PK -> productList)
}
Aber wenn ich es ausführe, gibt es mir den folgenden Fehler.
In einigen Iterationen scheint die
selection
leer zu sein.
Ich verstehe jedoch nicht, wie ich mit diesem Fehler umgehen kann:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74)
at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Was bedeutet es und wie kann ich damit umgehen?
Das Problem ist, dass Sie versuchen, über
prodRows
auf
prodRows
prodRows.foreach
.
Sie können einen Datenrahmen nicht innerhalb einer Umwandlung verwenden. Datenrahmen sind nur auf dem Treiber vorhanden.
Sie können innerhalb einer Funktion, die an eine der DataFrame / RDD-Transformationen von Spark übergeben wird, nicht auf "treiberseitige" Abstraktionen (RDDs, DataFrames, Datasets, SparkSession ...) von Spark zugreifen. Innerhalb dieser Funktionen können Sie auch keine fahrerseitigen veränderbaren Objekte aktualisieren.
In Ihrem Fall versuchen Sie,
prodRows
und
selection
(beide sind DataFrames) in einer an
DataFrame.foreach
Funktion zu
DataFrame.foreach
.
Sie versuchen auch,
listOfProducts
(eine lokale
listOfProducts
Variable) über dieselbe Funktion zu
aktualisieren
.
Warum?
- DataFrames, RDDs und SparkSession sind nur in Ihrer Treiberanwendung vorhanden. Sie dienen als "Handle" für den Zugriff auf Daten, die über den Cluster von Arbeitscomputern verteilt sind.
- An RDD / DataFrame-Transformationen übergebene Funktionen werden serialisiert und an diesen Cluster gesendet, um auf den Datenpartitionen auf jedem der Worker-Computer ausgeführt zu werden. Wenn die serialisierten DataFrames / RDDs auf diesen Computern deserialisiert werden - sie sind unbrauchbar und können die Daten im Cluster nicht darstellen, da es sich lediglich um leere Kopien der in der Treiberanwendung erstellten handelt, die tatsächlich eine Verbindung zum Cluster aufrechterhält maschinen
- Aus demselben Grund schlägt der Versuch fehl, fahrerseitige Variablen zu aktualisieren: Die Variablen (die in den meisten Fällen als leer beginnen) werden serialisiert, für jeden Worker deserialisiert, werden lokal auf den Workern aktualisiert und bleiben dort. Die ursprüngliche fahrerseitige Variable bleibt unverändert
Wie können Sie das lösen? Wenn Sie mit Spark arbeiten, insbesondere mit DataFrames, sollten Sie versuchen, eine "Iteration" der Daten zu vermeiden, und stattdessen deklarative Operationen von DataFrame verwenden. Wenn Sie auf Daten eines anderen DataFrames für jeden Datensatz in Ihrem DataFrame verweisen möchten, möchten Sie in den meisten Fällen einen Join verwenden, um einen neuen DataFrame mit Datensätzen zu erstellen, in denen Daten aus den beiden DataFrames kombiniert werden.
In diesem speziellen Fall ist hier eine ungefähr gleichwertige Lösung, die das tut, was Sie versuchen, wenn ich es richtig abschließen konnte. Versuchen Sie, dies zu verwenden, und lesen Sie die DataFrame-Dokumentation, um die Details herauszufinden:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val numRecProducts = 10
val result = prodRows.as("left")
// self-join by gender:
.join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
// limit to 10 results per record:
.withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
.filter($"rn" <= numRecProducts).drop($"rn")
// group and collect_list to create products column:
.groupBy($"left.product_PK" as "product_PK")
.agg(collect_list(struct($"right.product_PK", lit(1))) as "products")