apache-spark - tutorial - using apache spark




Qual é a diferença entre mapa e flatMap e um bom caso de uso para cada um? (10)

A diferença pode ser vista abaixo da amostra do código pyspark:

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]


rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]

Alguém pode me explicar a diferença entre mapa e flatMap e o que é um bom caso de uso para cada um?

O que significa "achatar os resultados" significa? O que é bom para isso?


Aqui está um exemplo da diferença, como uma sessão de spark-shell :

Primeiro, alguns dados - duas linhas de texto:

val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines

rdd.collect

    res0: Array[String] = Array("Roses are red", "Violets are blue")

Agora, o map transforma um RDD de comprimento N em outro RDD de comprimento N.

Por exemplo, mapeia de duas linhas para duas comprimentos de linha:

rdd.map(_.length).collect

    res1: Array[Int] = Array(13, 16)

Mas flatMap (falando vagamente) transforma um RDD de comprimento N em uma coleção de N coleções, e então as mescla em um único RDD de resultados.

rdd.flatMap(_.split(" ")).collect

    res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")

Temos várias palavras por linha e várias linhas, mas acabamos com uma única matriz de palavras de saída

Apenas para ilustrar isso, flatMapping de uma coleção de linhas para uma coleção de palavras se parece com:

["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]

Os RDDs de entrada e saída serão, portanto, tipicamente de tamanhos diferentes para flatMap .

Se tivéssemos tentado usar o map com nossa função split , teríamos acabado com estruturas aninhadas (um RDD de matrizes de palavras, com o tipo RDD[Array[String]] ) porque temos que ter exatamente um resultado por entrada:

rdd.map(_.split(" ")).collect

    res3: Array[Array[String]] = Array(
                                     Array(Roses, are, red), 
                                     Array(Violets, are, blue)
                                 )

Finalmente, um caso especial útil é mapear com uma função que pode não retornar uma resposta e, portanto, retorna uma Option . Podemos usar flatMap para filtrar os elementos que retornam None e extrair os valores daqueles que retornam um Some :

val rdd = sc.parallelize(Seq(1,2,3,4))

def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None

rdd.flatMap(myfn).collect

    res3: Array[Int] = Array(10,20)

(observando aqui que uma opção se comporta como uma lista que tem um elemento ou zero elementos)


Flatmap e Map ambos transformam a coleção.

Diferença:

map (func)
Retorna um novo conjunto de dados distribuído formado pela passagem de cada elemento da fonte por meio de uma função func.

flatMap (func)
Semelhante ao mapa, mas cada item de entrada pode ser mapeado para 0 ou mais itens de saída (portanto func deve retornar um Seq em vez de um único item).

A função de transformação:
map : Um elemento em -> um elemento fora.
flatMap : Um elemento em -> 0 ou mais elementos (uma coleção).


Geralmente usamos o exemplo de contagem de palavras no hadoop. Vou pegar o mesmo caso de uso e vou usar map e flatMap e vamos ver a diferença como está processando os dados.

Abaixo está o arquivo de dados de amostra.

hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome

O arquivo acima será analisado usando map e flatMap .

Usando o map

>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']

A entrada tem 4 linhas e o tamanho de saída também é 4, isto é, N elementos ==> N elementos.

Usando flatMap

>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']

A saída é diferente do mapa.

Vamos atribuir 1 como valor para cada chave para obter a contagem de palavras.

  • fm : RDD criado usando flatMap
  • wc : RDD criado usando map
>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]

Considerando que o map no RDD wc fornecerá a saída abaixo indesejada:

>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]

Você não pode obter a contagem de palavras se o map for usado em vez de flatMap .

De acordo com a definição, a diferença entre o map e o flatMap é:

map : Retorna um novo RDD aplicando determinada função a cada elemento do RDD. Função no map retorna apenas um item.

flatMap : Semelhante ao map , ele retorna um novo RDD aplicando uma função a cada elemento do RDD, mas a saída é nivelada.


Se você está perguntando a diferença entre RDD.map e RDD.flatMap no Spark, o mapa transforma um RDD de tamanho N para outro de tamanho N. por exemplo.

myRDD.map(x => x*2)

por exemplo, se myRDD é composto por duplas.

Enquanto o flatMap pode transformar o RDD em um outro de tamanho diferente: por exemplo:

myRDD.flatMap(x =>new Seq(2*x,3*x))

que retornará um RDD de tamanho 2 * N ou

myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )

Tudo se resume à sua pergunta inicial: o que você quer dizer com achatamento ?

Quando você usa flatMap, uma coleção "multidimensional" torna - se coleção "unidimensional" .

val array1d = Array ("1,2,3", "4,5,6", "7,8,9")  
//array1d is an array of strings

val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )

val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)

Você quer usar um flatMap quando,

  • sua função de mapa resulta na criação de estruturas multicamadas
  • mas tudo o que você quer é uma estrutura unidimensional simples e plana, removendo TODOS os agrupamentos internos

map e flatMap são semelhantes, no sentido em que pegam uma linha a partir da entrada RDD e aplicam uma função nela. A maneira como eles diferem é que a função no map retorna apenas um elemento, enquanto a função no flatMap pode retornar uma lista de elementos (0 ou mais) como um iterador.

Além disso, a saída do flatMap é achatada. Embora a função em flatMap retorne uma lista de elementos, o flatMap retorna um RDD que possui todos os elementos da lista de forma plana (não uma lista).


RDD.map retorna todos os elementos em uma única matriz

RDD.flatMap retorna elementos em matrizes de matriz

vamos supor que temos texto no arquivo text.txt como

Spark is an expressive framework
This text is to understand map and faltMap functions of Spark RDD

Usando o mapa

val text=sc.textFile("text.txt").map(_.split(" ")).collect

saída:

text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))

Usando flatMap

val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect

saída:

 text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)

map retorna RDD de igual número de elementos enquanto flatMap não pode.

Um caso de uso de exemplo para flatMap Filtra dados ausentes ou incorretos.

Um exemplo de caso de uso para map Use em uma grande variedade de casos em que o número de elementos de entrada e saída é o mesmo.

number.csv

1
2
3
-
4
-
5

map.py adiciona todos os números em add.csv.

from operator import *

def f(row):
  try:
    return float(row)
  except Exception:
    return 0

rdd = sc.textFile('a.csv').map(f)

print(rdd.count())      # 7
print(rdd.reduce(add))  # 15.0

flatMap.py usa o flatMap para filtrar dados ausentes antes da adição. Menos números são adicionados em comparação com a versão anterior.

from operator import *

def f(row):
  try:
    return [float(row)]
  except Exception:
    return []

rdd = sc.textFile('a.csv').flatMap(f)

print(rdd.count())      # 5
print(rdd.reduce(add))  # 15.0

  • map (func) Retorna um novo dataset distribuído formado pela passagem de cada elemento da fonte através de uma função func declared.so map () é um termo único

enquanto

  • flatMap (func) Semelhante ao mapa, mas cada item de entrada pode ser mapeado para 0 ou mais itens de saída, portanto func deve retornar uma Sequência em vez de um único item.




apache-spark