scala - स्पार्क 2.0+ में यूनिट परीक्षण कैसे लिखें?




unit-testing apache-spark (4)

मैं SparkSession परीक्षण ढांचे के साथ SparkSession का परीक्षण करने का एक उचित तरीका खोजने की कोशिश कर रहा हूं। हालांकि SparkContext लिए अच्छे उदाहरण प्रतीत SparkContext , लेकिन मुझे यह पता नहीं लगाया जा सकता कि SparkSession लिए काम करने के लिए एक समान उदाहरण कैसे प्राप्त किया SparkSession , भले ही इसे spark-testing-base में आंतरिक रूप से कई स्थानों पर उपयोग किया जाए। मुझे ऐसे समाधान का प्रयास करने में प्रसन्नता होगी जो स्पार्क-टेस्टिंग-बेस का उपयोग न करे, अगर यह वास्तव में यहां जाने का सही तरीका नहीं है।

सरल परीक्षण केस ( build.sbt साथ पूर्ण MWE प्रोजेक्ट ):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

जुनीट के साथ इसे चलाने का नतीजा लोड लाइन पर एक एनपीई है:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

ध्यान दें कि इससे कोई फर्क नहीं पड़ता कि फ़ाइल लोड हो रही है या नहीं; एक उचित रूप से कॉन्फ़िगर स्पार्क सत्र में, एक और समझदार त्रुटि फेंक दी जाएगी


आप FunSuite के साथ एक सरल परीक्षण लिख सकते हैं और पहले और बाद में नीचे की तरह

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

आपको परीक्षण में कोई फ़ंक्शन बनाने की आवश्यकता नहीं है जिसे आप बस लिख सकते हैं

test ("test name") {//implementation and assert}

होल्डन करौ ने वास्तव में अच्छा परीक्षण spark-testing-base लिखा है

आपको नीचे की जांच करना एक साधारण उदाहरण है

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

उम्मीद है की यह मदद करेगा!


इस उत्कृष्ट प्रश्न को वहां रखने के लिए धन्यवाद। कुछ कारणों से, जब स्पार्क की बात आती है, तो हर कोई एनालिटिक्स में इतना पकड़ा जाता है कि वे पिछले 15 सालों से उभरे महान सॉफ्टवेयर इंजीनियरिंग प्रथाओं के बारे में भूल जाते हैं। यही कारण है कि हम अपने पाठ्यक्रम में परीक्षण और निरंतर एकीकरण (देवोप्स जैसी अन्य चीजों के बीच) पर चर्चा करने के लिए एक बिंदु बनाते हैं।

शब्दावली पर एक त्वरित सहायक

एक वास्तविक इकाई परीक्षण का मतलब है कि आपके पास परीक्षण में प्रत्येक घटक पर पूर्ण नियंत्रण है। डेटाबेस, आरईएसटी कॉल, फाइल सिस्टम, या यहां तक ​​कि सिस्टम घड़ी के साथ कोई बातचीत नहीं हो सकती है; जेरार्ड मेज़ारोस ने इसे xUnit टेस्ट पैटर्न में रखा है, इसलिए सबकुछ "दोगुना" होना चाहिए (उदाहरण के लिए मजाक, स्टब्बेड इत्यादि)। मुझे पता है कि यह अर्थशास्त्र की तरह लगता है, लेकिन यह वास्तव में मायने रखता है। यह समझने में विफलता एक प्रमुख कारण है कि आप निरंतर एकीकरण में अंतःविषय परीक्षण विफलताओं को देखते हैं।

हम अभी भी यूनिट टेस्ट कर सकते हैं

इसलिए इस समझ को देखते हुए, एक RDD परीक्षण इकाई असंभव है। हालांकि, एनालिटिक्स विकसित करते समय यूनिट परीक्षण के लिए अभी भी एक जगह है।

एक सरल ऑपरेशन पर विचार करें:

rdd.map(foo).map(bar)

यहां foo और bar सरल कार्य हैं। उनको सामान्य तरीके से यूनिट परीक्षण किया जा सकता है, और वे जितना हो सके उतने कोने के मामलों के साथ होना चाहिए। आखिरकार, वे क्यों ख्याल रखते हैं कि वे कहां से इनपुट प्राप्त कर रहे हैं कि यह एक परीक्षण स्थिरता या RDD ?

स्पार्क शैल मत भूलना

यह प्रति परीक्षण नहीं कर रहा है , लेकिन इन शुरुआती चरणों में आपको अपने परिवर्तनों और विशेष रूप से आपके दृष्टिकोण के परिणामों को जानने के लिए स्पार्क खोल में भी प्रयोग करना चाहिए। उदाहरण के लिए, आप भौतिक और तार्किक क्वेरी योजनाओं, विभाजन रणनीति और संरक्षण की जांच कर सकते हैं, और आपके डेटा की स्थिति को कई अलग-अलग कार्यों जैसे कि toDebugString , explain , printSchema , show , printSchema आदि के साथ जांच सकते हैं। मैं आपको उनको तलाशने दूंगा।

आप स्पार्क शैल में अपने मास्टर को local[2] सेट कर सकते हैं और अपने परीक्षणों में किसी भी समस्या की पहचान करने के लिए जो केवल तब ही उत्पन्न हो सकते हैं जब आप काम वितरित करना शुरू कर देते हैं।

स्पार्क के साथ एकीकरण परीक्षण

अब मजेदार सामान के लिए।

अपने सहायक कार्यों और DataFrame / DataFrame परिवर्तन तर्क की गुणवत्ता में आत्मविश्वास महसूस करने के बाद स्पार्क को एकीकरण परीक्षण के लिए, कुछ चीजें करना महत्वपूर्ण है (निर्माण उपकरण और परीक्षण ढांचे के बावजूद):

  • जेवीएम मेमोरी बढ़ाएं।
  • फोर्किंग सक्षम करें लेकिन समांतर निष्पादन अक्षम करें।
  • अपने स्पार्क एकीकरण परीक्षण को सुइट्स में जमा करने के लिए अपने परीक्षण ढांचे का उपयोग करें, और सभी परीक्षणों से पहले SparkContext प्रारंभ करें और सभी परीक्षणों के बाद इसे रोक दें।

ScalaTest के साथ, आप पहले और BeforeAndAfterAll (जिसे मैं आम तौर पर पसंद करता हूं) में मिश्रण कर सकता हूं या BeforeAndAfterEach में @ShankarKoirala स्पार्क कलाकृतियों को शुरू करने और फाड़ने के लिए करता है। मुझे पता है कि यह अपवाद बनाने के लिए एक उचित जगह है, लेकिन मुझे वास्तव में उन म्यूटेबल vars पसंद नहीं हैं जिन्हें आप उपयोग करना चाहते हैं।

ऋण पैटर्न

एक और तरीका ऋण पैटर्न का उपयोग करना है।

उदाहरण के लिए (स्कैलाटेस्ट का उपयोग करके):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

जैसा कि आप देख सकते हैं, लोन पैटर्न परीक्षण के लिए SparkContext को "ऋण" करने के लिए उच्च-आदेश कार्यों का उपयोग करता है और उसके बाद इसे निपटाने के बाद इसका निपटान करता है।

पीड़ित-ओरिएंटेड प्रोग्रामिंग (धन्यवाद, नाथन)

यह पूरी तरह से वरीयता का मामला है, लेकिन जब तक मैं एक और ढांचा लाने से पहले कर सकता हूं, तब तक मैं लोन पैटर्न और तार चीजों का उपयोग करना पसंद करता हूं। हल्के वजन रखने की कोशिश करने के अलावा, ढांचे कभी-कभी बहुत सारे "जादू" को जोड़ते हैं जो डिबगिंग परीक्षण असफलताओं को तर्कसंगत बनाता है। तो मैं एक पीड़ित-ओरिएंटेड प्रोग्रामिंग दृष्टिकोण लेता हूं - जहां तक ​​मैं एक नया ढांचा जोड़ने से बचता हूं, जब तक कि दर्द न हो। लेकिन फिर, यह आप पर निर्भर है।

उस वैकल्पिक ढांचे के लिए सबसे अच्छा विकल्प निश्चित रूप से spark-testing-base है जिसका उल्लेख @ शंकरकोइराला ने किया है। उस स्थिति में, ऊपर दिया गया परीक्षण इस तरह दिखेगा:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

ध्यान दें कि SparkContext से निपटने के लिए मुझे कुछ भी करने की ज़रूरत नहीं SparkContextSharedSparkContext ने मुझे वह सब दिया - SparkContext रूप में SparkContext - मुफ्त में। व्यक्तिगत रूप से हालांकि मैं इस उद्देश्य के लिए इस निर्भरता को नहीं लाऊंगा क्योंकि ऋण पैटर्न वास्तव में इसके लिए आवश्यक है। साथ ही, वितरित प्रणालियों के साथ होने वाली इतनी अप्रत्याशितता के साथ, यह एक वास्तविक दर्द हो सकता है जो किसी तीसरे पक्ष की लाइब्रेरी के स्रोत कोड में होता है जो लगातार एकीकरण में गलत हो जाता है।

अब जहां स्पार्क-टेस्टिंग-बेस वास्तव में चमकता है, HDFSClusterLike -आधारित HDFSClusterLike जैसे HDFSClusterLike और YARNClusterLike । उन लक्षणों को मिलाकर वास्तव में आपको बहुत सारे सेटअप दर्द बचा सकते हैं। एक और जगह जहां यह चमकता है, Scalacheck -जैसी गुणों और जेनरेटर के साथ है - निश्चित रूप से आप समझते हैं कि संपत्ति-आधारित परीक्षण कैसे काम करता है और यह उपयोगी क्यों है। लेकिन फिर से, मैं व्यक्तिगत रूप से इसका उपयोग तब तक बंद कर दूंगा जब तक कि मेरे एनालिटिक्स और मेरे परीक्षण उस परिष्कार के स्तर तक नहीं पहुंच जाते।

"केवल एक सिथ पूर्ण में सौदों।" ओबी-वान केनोबी

बेशक, आपको एक या दूसरे को चुनना नहीं है। शायद आप अपने अधिकांश परीक्षणों और स्पार्क-परीक्षण-बेस के लिए केवल कुछ, अधिक कठोर परीक्षणों के लिए ऋण पैटर्न दृष्टिकोण का उपयोग कर सकते हैं। पसंद द्विआधारी नहीं है; आप दोनों कर सकते हैं।

स्पार्क स्ट्रीमिंग के साथ एकीकरण परीक्षण

आखिरकार, मैं स्पार्क- स्ट्रीमिंग एकीकरण परीक्षण सेटअप के साथ स्पार्कस्ट्रीमिंग एकीकरण परीक्षण सेटअप के स्निपेट को प्रस्तुत करना चाहता हूं, स्पार्क-परीक्षण-बेस के बिना:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

यह दिखने से आसान है। यह वास्तव में DStream को खिलाने के लिए डेटा की अनुक्रम को कतार में DStream । इसमें से अधिकांश वास्तव में बॉयलरप्लेट सेटअप है जो स्पार्क एपीआई के साथ काम करता है। भले ही आप StreamingSuiteBase साथ तुलना कर सकते हैं जैसा कि आप पसंद करते हैं यह तय करने के लिए स्पार्क-टेस्टिंग-बेस में पाए जाते हैं।

यह मेरी सबसे लंबी पोस्ट कभी हो सकती है, इसलिए मैं इसे यहां छोड़ दूंगा। मुझे आशा है कि अन्य लोग अन्य विचारों के साथ झुकाएं ताकि हमारे एनालिटिक्स की गुणवत्ता में सुधार करने में मदद मिल सके, जो एक ही चुस्त सॉफ्टवेयर इंजीनियरिंग प्रथाओं के साथ है जो अन्य सभी अनुप्रयोग विकास में सुधार हुआ है।

और लापरवाही प्लग के लिए क्षमा मांगने के साथ, आप अपाचे स्पार्क के साथ हमारे पाठ्यक्रम Analytics को देख सकते हैं, जहां हम इन विचारों और अधिक को संबोधित करते हैं। हम जल्द ही एक ऑनलाइन संस्करण होने की उम्मीद है।


मैं नीचे कोड के साथ समस्या हल कर सकता है

स्पार्क-हाइव निर्भरता प्रोजेक्ट पोम में जोड़ा जाता है

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

स्पार्क 1.6 के बाद से आप SharedSQLContext उपयोग कर सकते हैं कि स्पार्क अपने स्वयं के यूनिट परीक्षणों के लिए उपयोग करता है:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

चूंकि स्पार्क 2.3 SharedSparkSession उपलब्ध है:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

अद्यतन करें:

मेवेन निर्भरता:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

एसबीटी निर्भरता:

"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"




apache-spark-sql