scala स्पार्क 2.0+ में यूनिट परीक्षण कैसे लिखें?




unit-testing apache-spark (4)

मुझे SparkSessionTestWrapper विशेषता बनाना पसंद है जिसे परीक्षण कक्षाओं में मिश्रित किया जा सकता है। शंकर का दृष्टिकोण काम करता है, लेकिन यह कई फाइलों के साथ परीक्षण सूट के लिए निषिद्ध रूप से धीमा है।

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

इस प्रकार का उपयोग इस प्रकार किया जा सकता है:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

एक वास्तविक जीवन उदाहरण के लिए spark-spec प्रोजेक्ट की जांच करें जो spark-spec SparkSessionTestWrapper दृष्टिकोण का उपयोग करता है।

मैं SparkSession परीक्षण ढांचे के साथ SparkSession का परीक्षण करने का एक उचित तरीका खोजने की कोशिश कर रहा हूं। हालांकि SparkContext लिए अच्छे उदाहरण प्रतीत SparkContext , लेकिन मुझे यह पता नहीं लगाया जा सकता कि SparkSession लिए काम करने के लिए एक समान उदाहरण कैसे प्राप्त किया SparkSession , भले ही इसे spark-testing-base में आंतरिक रूप से कई स्थानों पर उपयोग किया जाए। मुझे ऐसे समाधान का प्रयास करने में प्रसन्नता होगी जो स्पार्क-टेस्टिंग-बेस का उपयोग न करे, अगर यह वास्तव में यहां जाने का सही तरीका नहीं है।

सरल परीक्षण केस ( build.sbt साथ पूर्ण MWE प्रोजेक्ट ):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

जुनीट के साथ इसे चलाने का नतीजा लोड लाइन पर एक एनपीई है:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

ध्यान दें कि इससे कोई फर्क नहीं पड़ता कि फ़ाइल लोड हो रही है या नहीं; एक उचित रूप से कॉन्फ़िगर स्पार्क सत्र में, एक और समझदार त्रुटि फेंक दी जाएगी


मैं नीचे कोड के साथ समस्या हल कर सकता है

स्पार्क-हाइव निर्भरता प्रोजेक्ट पोम में जोड़ा जाता है

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

स्पार्क 1.6 के बाद से आप SharedSQLContext उपयोग कर सकते हैं कि स्पार्क अपने स्वयं के यूनिट परीक्षणों के लिए उपयोग करता है:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

चूंकि स्पार्क 2.3 SharedSparkSession उपलब्ध है:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

अद्यतन करें:

मेवेन निर्भरता:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

एसबीटी निर्भरता:

"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

आप FunSuite के साथ एक सरल परीक्षण लिख सकते हैं और पहले और बाद में नीचे की तरह

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

आपको परीक्षण में कोई फ़ंक्शन बनाने की आवश्यकता नहीं है जिसे आप बस लिख सकते हैं

test ("test name") {//implementation and assert}

होल्डन करौ ने वास्तव में अच्छा परीक्षण spark-testing-base लिखा है

आपको नीचे की जांच करना एक साधारण उदाहरण है

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

उम्मीद है की यह मदद करेगा!





apache-spark-sql