scala spark - (Por qué)necesitamos llamar al caché o persistir en un RDD




map parallelize (5)

Cuando se crea un conjunto de datos distribuido (RDD) flexible a partir de un archivo de texto o colección (o de otro RDD), ¿necesitamos llamar explícitamente "caché" o "persistir" para almacenar los datos RDD en la memoria? ¿O los datos RDD están almacenados de forma distribuida en la memoria de manera predeterminada?

val textFile = sc.textFile("/user/emp.txt")

Según mi entendimiento, después del paso anterior, textFile es un RDD y está disponible en toda / parte de la memoria del nodo.

Si es así, ¿por qué tenemos que llamar "caché" o "persistir" en el archivo de texto RDD?


Answers

A continuación se muestran las tres situaciones en las que debe almacenar en caché sus RDD:

usando un RDD muchas veces

realizar múltiples acciones en el mismo RDD

para largas cadenas de transformaciones (o muy costosas)


Agregando otra razón para agregar (o agregar temporalmente) una llamada al método de cache .

para problemas de memoria de depuración

con el método de cache , la chispa dará información de depuración con respecto al tamaño del RDD. entonces en la interfaz de usuario integrada de chispa, obtendrá información de consumo de memoria RDD. y esto resultó ser muy útil para diagnosticar problemas de memoria.


¿Necesitamos llamar "caché" o "persistir" explícitamente para almacenar los datos RDD en la memoria?

Sí, solo si es necesario.

Los datos RDD almacenados de forma distribuida en la memoria de forma predeterminada?

¡No!

Y estas son las razones por las cuales:

  • Spark admite dos tipos de variables compartidas: variables de difusión, que se pueden usar para almacenar en caché un valor en la memoria en todos los nodos, y acumuladores, que son variables que solo se "agregan" a, como contadores y sumas.

  • Los RDD admiten dos tipos de operaciones: las transformaciones, que crean un nuevo conjunto de datos a partir de uno existente, y las acciones, que devuelven un valor al programa del controlador después de ejecutar un cálculo en el conjunto de datos. Por ejemplo, el mapa es una transformación que pasa cada elemento del conjunto de datos a través de una función y devuelve un nuevo RDD que representa los resultados. Por otro lado, reduce es una acción que agrega todos los elementos del RDD usando alguna función y devuelve el resultado final al programa del controlador (aunque también hay un parallelByKey paralelo que devuelve un conjunto de datos distribuidos).

  • Todas las transformaciones en Spark son flojas, ya que no calculan sus resultados de inmediato. En su lugar, solo recuerdan las transformaciones aplicadas a algún conjunto de datos base (por ejemplo, un archivo). Las transformaciones solo se computan cuando una acción requiere que se devuelva un resultado al programa del controlador. Este diseño permite que Spark funcione de manera más eficiente; por ejemplo, podemos darnos cuenta de que un conjunto de datos creado a través de un mapa se utilizará para reducir y devolver solo el resultado de la reducción al controlador, en lugar del conjunto de datos mapeados más grande.

  • De forma predeterminada, cada RDD transformado se puede volver a calcular cada vez que ejecuta una acción en él. Sin embargo, también puede persistir un RDD en la memoria utilizando el método persist (o caché), en cuyo caso Spark mantendrá los elementos en el clúster para un acceso mucho más rápido la próxima vez que lo consulte. También se admite el mantenimiento de RDD en el disco o la replicación en varios nodos.

Para más detalles, consulte la guía de programación Spark .


Creo que la pregunta sería mejor formulada como:

¿Cuándo debemos llamar al caché o persistir en un RDD?

Los procesos de chispa son flojos, es decir, nada sucederá hasta que se requiera. Para responder rápidamente a la pregunta, después de val textFile = sc.textFile("/user/emp.txt") , no ocurre nada con los datos, solo se construye un HadoopRDD , utilizando el archivo como fuente.

Digamos que transformamos esos datos un poco:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

De nuevo, nada sucede con los datos. Ahora hay un nuevo RDD wordsRDD que contiene una referencia a testFile y una función que se aplicará cuando sea necesario.

Solo cuando se wordsRDD.count una acción en un RDD, como wordsRDD.count , se wordsRDD.count la cadena RDD, llamada linaje . Es decir, los datos, desglosados ​​en particiones, serán cargados por los ejecutores del clúster Spark, se flatMap función flatMap y se calculará el resultado.

En un linaje lineal, como el de este ejemplo, cache() no es necesario. Los datos se cargarán a los ejecutores, todas las transformaciones se aplicarán y, finalmente, se computará el count , todo en la memoria, si los datos encajan en la memoria.

cache es útil cuando el linaje del RDD se ramifica. Digamos que quiere filtrar las palabras del ejemplo anterior en un conteo de palabras positivas y negativas. Podrías hacer esto así:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Aquí, cada rama emite una recarga de los datos. Agregar una declaración de cache explícita garantizará que el procesamiento realizado previamente se conserve y reutilice. El trabajo se verá así:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Por esa razón, se dice que el cache 'rompe el linaje' ya que crea un punto de control que puede reutilizarse para un procesamiento posterior.

Regla de oro: use la cache cuando el linaje de su RDD se bifurca o cuando un RDD se usa varias veces como en un bucle.


Puede tomar Polimorfismo a la carta como Polimorfismo a pedido .

La comunidad de Clojure está orgullosa del término Polimorfismo a la carta debido al hecho de que Clojure admite múltiples estrategias de polimorfismo. Algunos de ellos son:

  • Polimorfismo basado en prototipos

  • Polimorfismo de herencia

    Esta es la estrategia de polimorfismo utilizada por Java. Clojure admite esto por proxy. Útil al hacer la interoperabilidad de Java.

  • Protocolo

    El protocolo de Clojure es como TypeClass para Haskell.

  • Multimétodo

    Si bien los protocolos proporcionan despachos polimórficos basados ​​en el tipo del primer argumento, los multimétodos son mucho más flexibles y pueden enviarse según cualquier función de los argumentos del método (cualquiera).

Polimorfismo a la carta significa "Seleccione la mejor estrategia de polimorfismo para su caso. Están todos en su caja de herramientas".

Puede implementar el patrón TypeClass en Scala usando implicits. Lea la fuente de Scalaz si quiere ejemplos del mundo real. Scala no admite multimétodos a nivel de lenguaje, pero supongo que es posible con la ayuda de la próxima macro 2.10.

En cuanto a los beneficios, las estrategias avanzadas de polimorfismo como TypeClass y Multimethod pueden ayudar a resolver el problema de expresión .

"El objetivo es definir un tipo de datos por casos, donde uno puede agregar nuevos casos al tipo de datos y nuevas funciones sobre el tipo de datos, sin recompilar el código existente, y al mismo tiempo mantener la seguridad del tipo estático (por ejemplo, sin conversiones)".

Por cierto, esta pregunta es demasiado grande para caber en una sola pregunta de . Mi sugerencia es familiarizarme con estos conceptos, y luego comprenderá su utilidad.







scala apache-spark rdd