apache-spark java - What is the difference between map and flatMap and a good use case for each?




examples python (13)

Flatmap and Map both transforms the collection.

Difference:

map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.

flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

The transformation function:
map: One element in -> one element out.
flatMap: One element in -> 0 or more elements out (a collection).

Can someone explain to me the difference between map and flatMap and what is a good use case for each?

What does "flatten the results" mean? What is it good for?


map and flatMap are similar, in the sense they take a line from the input RDD and apply a function on it. The way they differ is that the function in map returns only one element, while function in flatMap can return a list of elements (0 or more) as an iterator.

Also, the output of the flatMap is flattened. Although the function in flatMap returns a list of elements, the flatMap returns an RDD which has all the elements from the list in a flat way (not a list).


It boils down to your initial question: what you mean by flattening ?

When you use flatMap, a "multi-dimensional" collection becomes "one-dimensional" collection.

val array1d = Array ("1,2,3", "4,5,6", "7,8,9")  
//array1d is an array of strings

val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )

val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)

You want to use a flatMap when,

  • your map function results in creating multi layered structures
  • but all you want is a simple - flat - one dimensional structure, by removing ALL the internal groupings

Use test.md as a example:

➜  spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.

scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3

scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15

scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))

scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)

If you use map method, you will get the lines of test.md, for flatMap method, you will get the number of words.

The map method is similar to flatMap, they are all return a new RDD. map method often to use return a new RDD, flatMap method often to use split words.


If you are asking the difference between RDD.map and RDD.flatMap in Spark, map transforms an RDD of size N to another one of size N . eg.

myRDD.map(x => x*2)

for example, if myRDD is composed of Doubles .

While flatMap can transform the RDD into anther one of a different size: eg.:

myRDD.flatMap(x =>new Seq(2*x,3*x))

which will return an RDD of size 2*N or

myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )

RDD.map returns all elements in single array

RDD.flatMap returns elements in Arrays of array

let's assume we have text in text.txt file as

Spark is an expressive framework
This text is to understand map and faltMap functions of Spark RDD

Using map

val text=sc.textFile("text.txt").map(_.split(" ")).collect

output:

text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))

Using flatMap

val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect

output:

 text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)

The difference can be seen from below sample pyspark code:

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]


rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]

map: It returns a new RDD by applying a function to each element of the RDD. Function in .map ****can return only one item.****

flatMap: Similar to map, it returns a new RDD by ****applying a function to each element of the RDD, but the output is flattened.****

Also, function in flatMap can return a list of elements (0 or more)

For Example: sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect() Output: [[1, 2], [1, 2, 3], [1, 2, 3, 4]]

sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect() Output: notice o/p is flattened out in a single list [1, 2, 1, 2, 3, 1, 2, 3, 4]

Source:https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/


map returns RDD of equal number of elements while flatMap may not.

An example use case for flatMap Filter out missing or incorrect data.

An example use case for map Use in wide variety of cases where is the number of elements of input and output are the same.

number.csv

1
2
3
-
4
-
5

map.py adds all numbers in add.csv.

from operator import *

def f(row):
  try:
    return float(row)
  except Exception:
    return 0

rdd = sc.textFile('a.csv').map(f)

print(rdd.count())      # 7
print(rdd.reduce(add))  # 15.0

flatMap.py uses flatMap to filtered out missing data before addition. Less numbers are added compared to the previous version.

from operator import *

def f(row):
  try:
    return [float(row)]
  except Exception:
    return []

rdd = sc.textFile('a.csv').flatMap(f)

print(rdd.count())      # 5
print(rdd.reduce(add))  # 15.0

For all those who've wanted PySpark related:

Example transformation: flatMap

>>> a="hello what are you doing"
>>> a.split()

['hello', 'what', 'are', 'you', 'doing']

>>> b=["hello what are you doing","this is rak"]
>>> b.split()

Traceback (most recent call last): File "", line 1, in AttributeError: 'list' object has no attribute 'split'

>>> rline=sc.parallelize(b)
>>> type(rline)

>>> def fwords(x):
...     return x.split()


>>> rword=rline.map(fwords)
>>> rword.collect()

[['hello', 'what', 'are', 'you', 'doing'], ['this', 'is', 'rak']]

>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()

['hello', 'what', 'are', 'you', 'doing', 'this', 'is', 'rak']

Hope it helps :)


Here is an example of the difference, as a spark-shell session:

First, some data - two lines of text:

val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines

rdd.collect

    res0: Array[String] = Array("Roses are red", "Violets are blue")

Now, map transforms an RDD of length N into another RDD of length N.

For example, it maps from two lines into two line-lengths:

rdd.map(_.length).collect

    res1: Array[Int] = Array(13, 16)

But flatMap (loosely speaking) transforms an RDD of length N into a collection of N collections, then flattens these into a single RDD of results.

rdd.flatMap(_.split(" ")).collect

    res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")

We have multiple words per line, and multiple lines, but we end up with a single output array of words

Just to illustrate that, flatMapping from a collection of lines to a collection of words looks like:

["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]

The input and output RDDs will therefore typically be of different sizes for flatMap.

If we had tried to use map with our split function, we'd have ended up with nested structures (an RDD of arrays of words, with type RDD[Array[String]]) because we have to have exactly one result per input:

rdd.map(_.split(" ")).collect

    res3: Array[Array[String]] = Array(
                                     Array(Roses, are, red), 
                                     Array(Violets, are, blue)
                                 )

Finally, one useful special case is mapping with a function which might not return an answer, and so returns an Option. We can use flatMap to filter out the elements that return None and extract the values from those that return a Some:

val rdd = sc.parallelize(Seq(1,2,3,4))

def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None

rdd.flatMap(myfn).collect

    res3: Array[Int] = Array(10,20)

(noting here that an Option behaves rather like a list that has either one element, or zero elements)


Difference in output of map and flatMap:

1.flatMap

val a = sc.parallelize(1 to 10, 5)

a.flatMap(1 to _).collect()

Output:

 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

2.map:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.map(_.length).collect()

Output:

3 6 6 3 8

A DataFrame is equivalent to a table in RDBMS and can also be manipulated in similar ways to the "native" distributed collections in RDDs. Unlike RDDs, Dataframes keep track of the schema and support various relational operations that lead to more optimized execution. Each DataFrame object represents a logical plan but because of their "lazy" nature no execution occurs until the user calls a specific "output operation".





apache-spark