apache-spark spark - What is the difference between cache and persist?




example dataframe (5)

Caching or persistence are optimization techniques for (iterative and interactive) Spark computations. They help saving interim partial results so they can be reused in subsequent stages. These interim results as RDDs are thus kept in memory (default) or more solid storage like disk and/or replicated. RDDs can be cached using cache operation. They can also be persisted using persist operation.

persist, cache

These functions can be used to adjust the storage level of a RDD. When freeing up memory, Spark will use the storage level identifier to decide which partitions should be kept. The parameter less variants persist() and cache() are just abbreviations for persist(StorageLevel.MEMORY_ONLY).

Warning: Once the storage level has been changed, it cannot be changed again!

Warning -Cache judiciously... see ((Why) do we need to call cache or persist on a RDD)

Just because you can cache a RDD in memory doesn’t mean you should blindly do so. Depending on how many times the dataset is accessed and the amount of work involved in doing so, recomputation can be faster than the price paid by the increased memory pressure.

It should go without saying that if you only read a dataset once there is no point in caching it, it will actually make your job slower. The size of cached datasets can be seen from the Spark Shell..

Listing Variants...

def cache(): RDD[T]
 def persist(): RDD[T]
 def persist(newLevel: StorageLevel): RDD[T]

*See below example : *

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
     c.getStorageLevel
     res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1)
     c.cache
     c.getStorageLevel
     res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)

The difference between cache and persist operations is purely syntactic. cache is a synonym of persist or persist(MEMORY_ONLY), i.e. cache is merely persist with the default storage level MEMORY_ONLY

Note : Due to the very small and purely syntactic difference between caching and persistence of RDDs the two terms are often used interchangeably.

See more visually here....

Persist in memory and disk:

Cache

Caching can improve the performance of your application to a great extent. Source

In terms of RDD persistence, what are the differences between cache() and persist() in spark ?


Spark gives 5 types of Storage level

  • MEMORY_ONLY
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK
  • MEMORY_AND_DISK_SER
  • DISK_ONLY

cache() will use MEMORY_ONLY. If you want to use something else, use persist(StorageLevel.<*type*>).

By default persist() will store the data in the JVM heap as unserialized objects.


With cache(), you use only the default storage level MEMORY_ONLY. With persist(), you can specify which storage level you want,(rdd-persistence).

From the official docs:

  • You can mark an RDD to be persisted using the persist() or cache() methods on it.
  • each persisted RDD can be stored using a different storage level
  • The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).

Use persist() if you want to assign a storage level other than MEMORY_ONLY to the RDD (which storage level to choose)


There is no difference. From RDD.scala.

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

There are few important difference but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. Lets consider following examples:

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
  • cache / persist:

    val indCache  = rdd.mapValues(_ > 4)
    indCache.persist(StorageLevel.DISK_ONLY)
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    indCache.count
    // 3
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
  • checkpoint:

    val indChk  = rdd.mapValues(_ > 4)
    indChk.checkpoint
    
    // indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 []
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 []
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []
    
    indChk.count
    // 3
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ReliableCheckpointRDD[12] at count at <console>:27 []
    

As you can see in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache are lost. In the second case lineage is completely lost after the checkpoint and indChk doesn't carry an information required to rebuild it anymore.

checkpoint, unlike cache / persist is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:

It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Finally checkpointed data is persistent and not removed after SparkContext is destroyed.

Regarding data storage SparkContext.setCheckpointDir used by RDD.checkpoint requires DFS path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint and persist without replication should use local file system.

Note:

RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.







apache-spark distributed-computing rdd