apache-spark - rxjava - spark flatmap example scala




What is the difference between map and flatMap and a good use case for each? (10)

Can someone explain to me the difference between map and flatMap and what is a good use case for each?

What does "flatten the results" mean? What is it good for?


map returns RDD of equal number of elements while flatMap may not.

An example use case for flatMap Filter out missing or incorrect data.

An example use case for map Use in wide variety of cases where is the number of elements of input and output are the same.

number.csv

1
2
3
-
4
-
5

map.py adds all numbers in add.csv.

from operator import *

def f(row):
  try:
    return float(row)
  except Exception:
    return 0

rdd = sc.textFile('a.csv').map(f)

print(rdd.count())      # 7
print(rdd.reduce(add))  # 15.0

flatMap.py uses flatMap to filtered out missing data before addition. Less numbers are added compared to the previous version.

from operator import *

def f(row):
  try:
    return [float(row)]
  except Exception:
    return []

rdd = sc.textFile('a.csv').flatMap(f)

print(rdd.count())      # 5
print(rdd.reduce(add))  # 15.0

Difference in output of map and flatMap:

1.flatMap

val a = sc.parallelize(1 to 10, 5)

a.flatMap(1 to _).collect()

Output:

 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

2.map:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.map(_.length).collect()

Output:

3 6 6 3 8

For all those who've wanted PySpark related:

Example transformation: flatMap

>>> a="hello what are you doing"
>>> a.split()

['hello', 'what', 'are', 'you', 'doing']

>>> b=["hello what are you doing","this is rak"]
>>> b.split()

Traceback (most recent call last): File "", line 1, in AttributeError: 'list' object has no attribute 'split'

>>> rline=sc.parallelize(b)
>>> type(rline)

>>> def fwords(x):
...     return x.split()


>>> rword=rline.map(fwords)
>>> rword.collect()

[['hello', 'what', 'are', 'you', 'doing'], ['this', 'is', 'rak']]

>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()

['hello', 'what', 'are', 'you', 'doing', 'this', 'is', 'rak']

Hope it helps :)


Generally we use word count example in hadoop. I will take the same use case and will use map and flatMap and we will see the difference how it is processing the data.

Below is the sample data file.

hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome

The above file will be parsed using map and flatMap.

Using map

>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']

Input has 4 lines and output size is 4 as well, i.e., N elements ==> N elements.

Using flatMap

>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']

The output is different from map.


Let's assign 1 as value for each key to get the word count.

  • fm: RDD created by using flatMap
  • wc: RDD created using map
>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]

Whereas map on RDD wc will give the below undesired output:

>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]

You can't get the word count if map is used instead of flatMap.

As per the definition, difference between map and flatMap is:

map: It returns a new RDD by applying given function to each element of the RDD. Function in map returns only one item.

flatMap: Similar to map, it returns a new RDD by applying a function to each element of the RDD, but output is flattened.


If you are asking the difference between RDD.map and RDD.flatMap in Spark, map transforms an RDD of size N to another one of size N . eg.

myRDD.map(x => x*2)

for example, if myRDD is composed of Doubles .

While flatMap can transform the RDD into anther one of a different size: eg.:

myRDD.flatMap(x =>new Seq(2*x,3*x))

which will return an RDD of size 2*N or

myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )

It boils down to your initial question: what you mean by flattening ?

When you use flatMap, a "multi-dimensional" collection becomes "one-dimensional" collection.

val array1d = Array ("1,2,3", "4,5,6", "7,8,9")  
//array1d is an array of strings

val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )

val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)

You want to use a flatMap when,

  • your map function results in creating multi layered structures
  • but all you want is a simple - flat - one dimensional structure, by removing ALL the internal groupings

The difference can be seen from below sample pyspark code:

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]


rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]

Use test.md as a example:

➜  spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.

scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3

scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15

scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))

scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)

If you use map method, you will get the lines of test.md, for flatMap method, you will get the number of words.

The map method is similar to flatMap, they are all return a new RDD. map method often to use return a new RDD, flatMap method often to use split words.


map: It returns a new RDD by applying a function to each element of the RDD. Function in .map ****can return only one item.****

flatMap: Similar to map, it returns a new RDD by ****applying a function to each element of the RDD, but the output is flattened.****

Also, function in flatMap can return a list of elements (0 or more)

For Example: sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect() Output: [[1, 2], [1, 2, 3], [1, 2, 3, 4]]

sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect() Output: notice o/p is flattened out in a single list [1, 2, 1, 2, 3, 1, 2, 3, 4]

Source:https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/


  • map(func) Return a new distributed dataset formed by passing each element of the source through a function func declared.so map()is single term

whiles

  • flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items so func should return a Sequence rather than a single item.




apache-spark