scala - base_2 - spark unit test




Comment écrire des tests unitaires dans Spark 2.0+? (4)

J'ai essayé de trouver un moyen raisonnable de tester SparkSession avec le framework de test JUnit. Bien qu'il semble y avoir de bons exemples pour SparkContext , je n'ai pas pu trouver comment obtenir un exemple correspondant pour SparkSession , même s'il est utilisé à plusieurs endroits en interne dans spark-testing-base . Je serais ravi d’essayer une solution qui n’utilise pas non plus le test d’étincelles si ce n’est pas la bonne façon d’y aller.

Cas de test simple ( projet MWE complet avec build.sbt ):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

L'exécution de ceci avec JUnit est un NPE sur la ligne de charge:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Notez que le fait que le fichier en cours de chargement existe ou non dans une SparkSession correctement configurée, une erreur plus sensible sera émise.


Depuis Spark 1.6, vous pouvez utiliser SharedSQLContext que Spark utilise pour ses propres tests unitaires:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Depuis Spark 2.3 SharedSparkSession est disponible:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Mettre à jour:

Dépendance Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

Dépendance à SBT:

"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

J'aime créer un trait SparkSessionTestWrapper qui peut être mélangé pour tester des classes. L'approche de Shankar fonctionne, mais elle est trop lente pour les suites de tests contenant plusieurs fichiers.

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

Le trait peut être utilisé comme suit:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

Vérifiez le projet spark-spec pour un exemple réel qui utilise l'approche SparkSessionTestWrapper .


Merci d'avoir posé cette question en suspens. Pour une raison quelconque, quand il s’agit de Spark, tout le monde est tellement pris par l’analyse qu’il oublie les excellentes pratiques d’ingénierie des logiciels qui ont émergé ces 15 dernières années environ. C'est pourquoi nous nous efforçons de discuter des tests et de l'intégration continue (entre autres choses comme DevOps) dans notre cours.

Un coup d'oeil rapide sur la terminologie

Un véritable test unitaire signifie que vous avez un contrôle complet sur chaque composant du test. Il ne peut y avoir d'interaction avec les bases de données, les appels REST, les systèmes de fichiers ou même l'horloge système; Tout doit être "doublé" (par exemple raillé, écrasé, etc.) comme le dit Gerard Mezaros dans les modèles de test xUnit . Je sais que cela ressemble à de la sémantique, mais cela compte vraiment. Ne pas comprendre cela est l'une des principales raisons pour lesquelles vous observez des échecs de tests intermittents dans une intégration continue.

Nous pouvons toujours tester l'unité

Donc, étant donné cette compréhension, le test unitaire d'un RDD est impossible. Cependant, il y a toujours une place pour les tests unitaires lors du développement des analyses.

Considérons une opération simple:

rdd.map(foo).map(bar)

Ici, foo et bar sont des fonctions simples. Celles-ci peuvent être testées par unité de la manière habituelle, et elles doivent comporter autant de casiers en coin que possible. Après tout, pourquoi se soucient-ils de savoir où ils obtiennent leurs contributions, qu’il s’agisse d’un appareil de test ou d’un RDD ?

N'oubliez pas la coquille d'étincelle

Ce n'est pas un test en soi , mais dans ces premiers stades, vous devriez également expérimenter le shell Spark pour déterminer vos transformations et en particulier les conséquences de votre approche. Par exemple, vous pouvez examiner les plans de requêtes physiques et logiques, la stratégie de partitionnement et la préservation, ainsi que l'état de vos données avec de nombreuses fonctions différentes, telles que toDebugString , explain , glom , show , printSchema , etc. Je vais vous laisser les explorer.

Vous pouvez également définir votre maître sur local[2] dans le shell Spark et dans vos tests pour identifier tous les problèmes qui peuvent survenir uniquement lorsque vous commencez à distribuer le travail.

Tests d'intégration avec Spark

Maintenant, pour les trucs amusants.

Afin d' intégrer le test Spark après avoir eu confiance en la qualité de vos fonctions d'assistance et de la logique de transformation RDD / DataFrame , il est essentiel de faire certaines choses (indépendamment de l'outil de construction et du cadre de test):

  • Augmenter la mémoire JVM.
  • Activez le forking mais désactivez l'exécution en parallèle.
  • Utilisez votre SparkContext test pour accumuler vos tests d'intégration Spark dans des suites, et initialisez SparkContext avant tous les tests et arrêtez-le après tous les tests.

Avec ScalaTest, vous pouvez mélanger BeforeAndAfterAll (que je préfère en général) ou BeforeAndAfterEach comme @ShankarKoirala pour initialiser et supprimer les artefacts Spark. Je sais que c'est un endroit raisonnable pour faire une exception, mais je n'aime pas vraiment ces variantes mutables que vous devez utiliser.

Le modèle de prêt

Une autre approche consiste à utiliser le modèle de prêt .

Par exemple (en utilisant ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

Comme vous pouvez le constater, le modèle de prêt utilise des fonctions d'ordre supérieur pour "prêter" le SparkContext au test, puis pour en disposer après l'avoir fait.

Programmation orientée vers la souffrance (Merci Nathan)

C'est totalement une question de préférence, mais je préfère utiliser le modèle de prêt et me brancher les choses aussi longtemps que possible avant d'introduire un autre cadre. En plus d'essayer de rester légers, les frameworks ajoutent parfois beaucoup de «magie», ce qui rend les tests de débogage difficiles à raisonner. Donc, je prends une approche de la programmation axée sur la souffrance - où j'évite d'ajouter un nouveau cadre tant que la souffrance de ne pas en avoir est trop grande. Mais encore une fois, cela dépend de vous.

Le meilleur choix pour ce cadre alternatif est bien sûr le spark-testing-base comme @ShankarKoirala mentionné. Dans ce cas, le test ci-dessus ressemblerait à ceci:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

Notez que je n'ai rien eu à faire avec le SparkContext . SharedSparkContext m'a donné tout cela - avec sc comme SparkContext - gratuitement. Personnellement, je ne ferais pas appel à cette dépendance pour cette raison, car le modèle de prêt répond exactement à mes besoins. En outre, compte tenu de l’imprévisibilité des systèmes distribués, il peut être très difficile de suivre la magie du code source d’une bibliothèque tierce lorsque les choses ne se passent pas bien dans une intégration continue.

Maintenant, les HDFSClusterLike basées sur Hadoop, comme HDFSClusterLike et YARNClusterLike , sont YARNClusterLike . Mélanger ces traits peut vous faire économiser beaucoup de douleur. Les propriétés et les générateurs de type Scalacheck autre endroit qui fait briller le Scalacheck - en supposant que vous compreniez bien le fonctionnement des tests basés sur les propriétés et leur utilité. Mais encore une fois, je m'abstiens de l'utiliser jusqu'à ce que mes analyses et mes tests atteignent ce niveau de sophistication.

"Seulement un Sith traite en absolus." -- Obi Wan Kenobi

Bien entendu, vous n'avez pas à choisir l'un ou l'autre. Peut-être pourriez-vous utiliser l'approche du modèle de prêt pour la plupart de vos tests et pour tester quelques bases plus rigoureuses. Le choix n'est pas binaire; vous pouvez faire les deux.

Test d'intégration avec Spark Streaming

Enfin, je voudrais juste présenter un extrait de ce à quoi pourrait ressembler une configuration de test d'intégration SparkStreaming avec des valeurs en mémoire sans base de test :

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

C'est plus simple qu'il n'y paraît. Il ne fait que transformer une séquence de données en file d'attente pour alimenter le DStream . La plupart de ces solutions ne sont que de simples installations fonctionnant avec les API Spark. Quoi qu'il en soit, vous pouvez comparer cela avec StreamingSuiteBase comme vous le trouverez dans spark-testing-base, pour décider lequel vous préférez.

Cela pourrait être mon plus long message, alors je vais le laisser ici. J'espère que d'autres personnes émettront d'autres idées pour améliorer la qualité de nos analyses avec les mêmes pratiques d'ingénierie logicielle agiles qui ont amélioré tous les autres développements d'applications.

Et avec nos excuses pour la fiche sans vergogne, vous pouvez consulter notre cours Analytics avec Apache Spark , où nous abordons un grand nombre de ces idées et plus encore. Nous espérons avoir une version en ligne bientôt.


Vous pouvez écrire un test simple avec FunSuite et BeforeAndAfterEach comme ci-dessous

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

Vous n'avez pas besoin de créer une fonction en test, vous pouvez simplement écrire comme

test ("test name") {//implementation and assert}

Holden Karau a écrit très beau test spark-testing-base

Vous devez vérifier ci-dessous est un exemple simple

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

J'espère que cela t'aides!





apache-spark-sql