out-of-memory bytearrayoutputstream - Spark java.lang.OutOfMemoryError:Java heap space




io at (7)

My cluster: 1 master, 11 slaves, each node has 6 GB memory.

My settings:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Here is the problem:

First, I read some data (2.19 GB) from HDFS to RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Second, do something on this RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Last, output to HDFS:

res.saveAsNewAPIHadoopFile(...)

When I run my program it shows:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

There are too many tasks?

PS: Every thing is ok when the input data is about 225 MB.

How can I solve this problem?


Answers

I have a few suggestions:

  • If your nodes are configured to have 6g maximum for Spark (and are leaving a little for other processes), then use 6g rather than 4g, spark.executor.memory=6g. Make sure you're using as much memory as possible by checking the UI (it will say how much mem you're using)
  • Try using more partitions, you should have 2 - 4 per CPU. IME increasing the number of partitions is often the easiest way to make a program more stable (and often faster). For huge amounts of data you may need way more than 4 per CPU, I've had to use 8000 partitions in some cases!
  • Decrease the fraction of memory reserved for caching, using spark.storage.memoryFraction. If you don't use cache() or persist in your code, this might as well be 0. It's default is 0.6, which means you only get 0.4 * 4g memory for your heap. IME reducing the mem frac often makes OOMs go away. UPDATE: From spark 1.6 apparently we will no longer need to play with these values, spark will determine them automatically.
  • Similar to above but shuffle memory fraction. If your job doesn't need much shuffle memory then set it to a lower value (this might cause your shuffles to spill to disk which can have catastrophic impact on speed). Sometimes when it's a shuffle operation that's OOMing you need to do the opposite i.e. set it to something large, like 0.8, or make sure you allow your shuffles to spill to disk (it's the default since 1.0.0).
  • Watch out for memory leaks, these are often caused by accidentally closing over objects you don't need in your lambdas. The way to diagnose is to look out for the "task serialized as XXX bytes" in the logs, if XXX is larger than a few k or more than an MB, you may have a memory leak. See https://stackoverflow.com/a/25270600/1586965
  • Related to above; use broadcast variables if you really do need large objects.
  • If you are caching large RDDs and can sacrifice some access time consider serialising the RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage. Or even caching them on disk (which sometimes isn't that bad if using SSDs).
  • (Advanced) Related to above, avoid String and heavily nested structures (like Map and nested case classes). If possible try to only use primitive types and index all non-primitives especially if you expect a lot of duplicates. Choose WrappedArray over nested structures whenever possible. Or even roll out your own serialisation - YOU will have the most information regarding how to efficiently back your data into bytes, USE IT!
  • (bit hacky) Again when caching, consider using a Dataset to cache your structure as it will use more efficient serialisation. This should be regarded as a hack when compared to the previous bullet point. Building your domain knowledge into your algo/serialisation can minimise memory/cache-space by 100x or 1000x, whereas all a Dataset will likely give is 2x - 5x in memory and 10x compressed (parquet) on disk.

http://spark.apache.org/docs/1.2.1/configuration.html

EDIT: (So I can google myself easier) The following is also indicative of this problem:

java.lang.OutOfMemoryError : GC overhead limit exceeded

You should increase the driver memory. In your $SPARK_HOME/conf folder you should find the file spark-defaults.conf, edit and set the spark.driver.memory 4000m depending on the memory on your master, I think. This is what fixed the issue for me and everything runs smoothly


To add a use case to this that is often not discussed, I will pose a solution when submitting a Spark application via spark-submit in local mode.

According to the gitbook Mastering Apache Spark by Jacek Laskowski:

You can run Spark in local mode. In this non-distributed single-JVM deployment mode, Spark spawns all the execution components - driver, executor, backend, and master - in the same JVM. This is the only mode where a driver is used for execution.

Thus, if you are experiencing OOM errors with the heap, it suffices to adjust the driver-memory rather than the executor-memory.

Here is an example:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

The location to set the memory heap size (at least in spark-1.0.0) is in conf/spark-env. The relevant variables are SPARK_EXECUTOR_MEMORY & SPARK_DRIVER_MEMORY. More docs are in the deployment guide

Also, don't forget to copy the configuration file to all the slave nodes.


You should configure offHeap memory settings as shown below:

`val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()`

Give the driver memory and executor memory as per your machines RAM availability. You can increase the offHeap size if you are still facing the OutofMemory issue.


Broadly speaking, spark Executor JVM memory can be divided into two parts. Spark memory and User memory. This is controlled by property spark.memory.fraction - the value is between 0 and 1. When working with images or doing memory intensive processing in spark applications, consider decreasing the spark.memory.fraction. This will make more memory available to your application work. Spark can spill, so it will still work with less memory share.

The second part of the problem is division of work. If possible, partition your data into smaller chunks. Smaller data possibly needs less memory. But if that is not possible, you are sacrifice compute for memory. Typically a single executor will be running multiple cores. Total memory of executors must be enough to handle memory requirements of all concurrent tasks. If increasing executor memory is not a option, you can decrease the cores per executor so that each task gets more memory to work with. Test with 1 core executors which have largest possible memory you can give and then keep increasing cores until you find the best core count.


  1. It sounds like your fragments leak - your fragment stays in memory with all their content (including the bitmap) even if onDestroy is called. You can use MAT + Heap dumps the see what exactly causes the leak (there are many guides available out there). The first suspects should be Singletons of out of fragment classes that you pass "this" from your fragments. As the first step I would clear everything from your fragment in onDestroy.

  2. Make the "SimpleImageLoadingListener()" an inner static class that holds your fragment as WeakReference - this will probably reduce the leak (or at least one of them).

  3. You should be using any 3rd ImageLoader as a Singleton with one LRU cache + one disk cache. You can find some good ones here (Any best third party tool to cache image in Android?)

  4. "largeHeap" in the manifest shouldn't be on your mind at all. Once you have a leak then you'll just have a larger heap to fill up until OOM.