apache spark - स्पार्क कोड संगठन और सर्वोत्तम प्रथाओं



apache-spark functional-programming (1)

मुझे लगता है कि आप Apache Spark को सब्सक्राइब कर सकते हैं, यूट्यूब पर databricks चैनल, अधिक सुन सकते हैं और अधिक जान सकते हैं, विशेष रूप से दूसरों से अनुभव और सबक के लिए।

यहाँ कुछ वीडियो सुझाए गए हैं:

और मैंने अपने गिथब और ब्लॉग पर इसे पोस्ट और अपडेट किया है:

आशा है कि यह आपकी मदद कर सकता है ~

इसलिए, कोड पुन: उपयोग, डिजाइन पैटर्न और हमेशा ध्यान में रखी जाने वाली सर्वोत्तम प्रथाओं के साथ एक वस्तु उन्मुख दुनिया में कई साल बिताने के बाद, मैं खुद को कोड संगठन और स्पार्क की दुनिया में कोड पुन: उपयोग के साथ कुछ संघर्ष कर रहा हूं।

यदि मैं पुन: प्रयोज्य तरीके से कोड लिखने की कोशिश करता हूं, तो यह लगभग हमेशा एक प्रदर्शन लागत के साथ आता है और मैं इसे फिर से लिखना चाहता हूं जो मेरे विशेष उपयोग के मामले के लिए इष्टतम है। यह निरंतर "यह लिखें कि इस विशेष उपयोग के मामले में इष्टतम क्या है" कोड संगठन को भी प्रभावित करता है, क्योंकि विभिन्न वस्तुओं या मॉड्यूल में कोड को विभाजित करना मुश्किल होता है जब "यह वास्तव में एक साथ होता है" और मैं इस प्रकार बहुत कम "भगवान" वस्तु के साथ लंबे समय तक रहता हूं जटिल परिवर्तनों की श्रृंखला। वास्तव में, मैं अक्सर सोचता हूं कि अगर मैंने अधिकतर स्पार्क कोड पर ध्यान दिया होता तो मैं अब वापस लिख रहा हूं जब मैं ऑब्जेक्ट ओरिएंटेड दुनिया में काम कर रहा था, तो मैंने इसे "स्पेगेटी कोड" के रूप में जीता और खारिज कर दिया।

मैंने इंटरनेट को ऑब्जेक्ट ओरिएंटेड दुनिया की सर्वोत्तम प्रथाओं के समतुल्य खोजने का प्रयास किया है, लेकिन बहुत अधिक भाग्य के बिना। मैं कार्यात्मक प्रोग्रामिंग के लिए कुछ "सर्वोत्तम अभ्यास" पा सकता हूं, लेकिन स्पार्क सिर्फ एक अतिरिक्त परत जोड़ता है, क्योंकि प्रदर्शन यहां एक प्रमुख कारक है।

तो मेरा आपसे प्रश्न है, क्या आपमें से किसी ने स्पार्क गुरुओं को स्पार्क कोड लिखने के लिए कुछ सर्वोत्तम अभ्यास दिए हैं, जिन्हें आप सुझा सकते हैं?

संपादित करें

जैसा कि एक टिप्पणी में लिखा गया था, मैंने वास्तव में किसी से भी इस समस्या का समाधान करने के बारे में जवाब देने की उम्मीद नहीं की थी, बल्कि मैं उम्मीद कर रहा था कि इस समुदाय में कोई व्यक्ति मार्टिन फाउलर के कुछ प्रकारों में आया था, जिन्होंने कहीं न कहीं लेख या ब्लॉग पोस्ट लिखे थे स्पार्क की दुनिया में कोड संगठन के साथ समस्याओं का समाधान कैसे करें।

@DanielDarabos ने सुझाव दिया कि मैं एक ऐसी स्थिति का उदाहरण रख सकता हूं जहां कोड संगठन और प्रदर्शन परस्पर विरोधी हैं। जब मुझे पता चलता है कि मेरे रोजमर्रा के काम में मुझे अक्सर यह समस्या होती है, तो मुझे इसे एक न्यूनतम न्यूनतम उदाहरण में उबालना थोड़ा मुश्किल लगता है;) लेकिन मैं कोशिश करूँगा।

ऑब्जेक्ट ओरिएंटेड दुनिया में, मैं सिंगल रिस्पॉन्सिबिलिटी प्रिंसिपल का बहुत बड़ा प्रशंसक हूं, इसलिए मैं यह सुनिश्चित करूंगा कि मेरे तरीके केवल एक चीज के लिए जिम्मेदार थे। यह उन्हें पुन: प्रयोज्य और आसानी से परीक्षण योग्य बनाता है। इसलिए अगर मुझे किसी सूची में कुछ संख्याओं के योग की गणना करने, कहने, (कुछ मानदंडों का मिलान करने) की गणना करनी थी और मुझे उसी संख्या के औसत की गणना करनी थी, तो मैं निश्चित रूप से दो विधियों का निर्माण करूंगा - एक जिसने योग की गणना की और एक की। औसत की गणना की। ऐशे ही:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

मैं स्पार्क में SRP का सम्मान करना जारी रख सकता हूं:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

लेकिन क्योंकि मेरे df में अरबों पंक्तियाँ हो सकती हैं, बल्कि मुझे filter दो बार करना नहीं होगा। वास्तव में, प्रदर्शन को सीधे EMR लागत के साथ जोड़ा जाता है, इसलिए मैं वास्तव में ऐसा नहीं चाहता हूं। इसे दूर करने के लिए, मैं इस प्रकार एसआरपी का उल्लंघन करने का फैसला करता हूं और बस दो कार्यों को एक में डाल देता हूं और सुनिश्चित करता हूं कि मैं देश-फ़िल्टर किए गए DataFrame इस तरह बनी रहती हूं:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

अब, अगर वास्तविक जीवन में इसका सामना करना पड़ा है, तो इसका एक बड़ा सरलीकरण है। यहाँ मैं इसे योग और एवी फ़ंक्शंस (जो कि अधिक SRP भी होगा) को सौंपने से पहले df को फ़िल्टर करके और जारी रखकर आसानी से हल कर सकता है, लेकिन वास्तविक जीवन में बार-बार होने वाली कई मध्यवर्ती गणनाएँ हो सकती हैं, जिनकी आवश्यकता बार-बार पड़ती है। दूसरे शब्दों में, यहाँ filter फंक्शन केवल एक ऐसी चीज़ का एक सरल उदाहरण बनाने का एक प्रयास है जो लगातार बने रहने से लाभान्वित होगा। वास्तव में, मुझे लगता है कि persist कॉल करना यहां एक कीवर्ड है। दृढ़ता से कॉल करने से मेरे काम में तेजी आएगी, लेकिन लागत यह है कि मुझे सभी कोड को कसकर जोड़े रखना होगा जो कि जारी DataFrame पर निर्भर करता है - भले ही वे तार्किक रूप से अलग हों।





code-organization