scala - कॉलम की उच्च संख्या वाले डेटासेट के लिए अपाचे स्पार्क में एमएल पाइपलाइन बनाने का सबसे अच्छा तरीका




apache-spark apache-spark-mllib (2)

janino त्रुटि अनुकूलक प्रक्रिया के दौरान बनाए गए स्थिर चर की संख्या के कारण है। JVM में निरंतर चर की अधिकतम सीमा है ((2 ^ 16) -1)। यदि यह सीमा पार हो गई है, तो आपको Constant pool for class ... has grown past JVM limit of 0xFFFF मिल Constant pool for class ... has grown past JVM limit of 0xFFFF

SPARK-18016 जो इस मुद्दे को ठीक करेगी वह SPARK-18016 , लेकिन इस समय भी यह प्रगति पर है।

आपका कोड VectorAssembler चरण के दौरान असफल होने की संभावना है, जब इसे एक अनुकूलन कार्य के दौरान हजारों कॉलम के विरुद्ध प्रदर्शन करना होता है।

इस समस्या के लिए मैंने जो कामकाज विकसित किया है वह स्तंभों के सबसेट के खिलाफ काम करके "वेक्टरों का वेक्टर" बनाना है और फिर एकवचन फीचर वेक्टर बनाने के लिए परिणाम अंत में एक साथ ला रहा है। यह किसी एकल अनुकूलन कार्य को JVM निरंतर सीमा से अधिक होने से रोकता है। यह सुरुचिपूर्ण नहीं है, लेकिन मैंने इसे 10k कॉलम रेंज तक पहुंचने वाले डेटासेट पर उपयोग किया है।

यह विधि आपको अभी भी एक पाइपलाइन रखने की अनुमति देती है, हालांकि इसे काम करने के लिए कुछ अतिरिक्त कदमों की आवश्यकता होती है (उप-वैक्टर बनाना)। उप-वैक्टर से फीचर वेक्टर बनाने के बाद, यदि आप वांछित हो तो मूल स्रोत कॉलम छोड़ सकते हैं।

उदाहरण कोड:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(नोट: कॉलम सूचियां बनाने की विधि को प्रोग्रामिक रूप से किया जाना चाहिए, लेकिन मैंने इस उदाहरण को अवधारणा को समझने के लिए सरल रखा है।)

मैं ~ 2000 सुविधाओं के साथ डेटासेट पर स्पार्क 2.1.1 के साथ काम कर रहा हूं और कुछ बुनियादी ट्रांसफॉर्मर और क्लासिफायरफायर युक्त मूल एमएल पाइपलाइन बनाने की कोशिश कर रहा हूं।

आइए सादगी के लिए मान लें कि जिस पाइपलाइन के साथ मैं काम कर रहा हूं जिसमें एक वेक्टर एस्सेबलर, स्ट्रिंगइंडेक्सर और क्लासिफायर शामिल है, जो काफी आम उपयोगकेस होगा।

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

यदि पाइपलाइन चरणों को एक ट्रांसफॉर्मर पाइपलाइन (वेक्टर एस्सेबलर + स्ट्रिंग इंडेक्सर) और दूसरी क्लासिफायरफायर पाइपलाइन में विभाजित किया जाता है, और यदि दोनों पाइपलाइनों के बीच अनावश्यक कॉलम गिराए जाते हैं, तो प्रशिक्षण सफल होता है। इसका मतलब मॉडल का पुन: उपयोग करने के लिए है, दो पाइपलाइन मॉडलों को प्रशिक्षण के बाद बचाया जाना चाहिए और मध्यस्थ प्रीप्रोकैसिंग चरण पेश करना है।

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(Imho) अधिक क्लीनर समाधान सभी पाइपलाइन चरणों को एक पाइपलाइन में विलय करना होगा।

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

हालांकि, सभी पाइपलाइनस्टेज को एक पाइपलाइन में डालने से निम्नलिखित अपवाद होता है, शायद this मुद्दे के कारण this पीआर अंततः हल हो जाएगा:

त्रुटि कोड जनरेटर: संकलित करने में विफल: org.codehaus.janino.JaninoRuntimeException: कक्षा org.apache.spark.sql.catalyst.expressions.GeneratedClass $ के लिए लगातार पूल विशिष्ट UnnsafeProjection 0xFFFF की JVM सीमा से अधिक हो गया है

इसका कारण यह है कि वेक्टर एस्सेबलर डेटाफ्रेम में डेटा की मात्रा को प्रभावी ढंग से दोगुना करता है (क्योंकि इस उदाहरण में) कोई ट्रांसफॉर्मर नहीं है जो अनावश्यक कॉलम को छोड़ सकता है। ( स्पार्क पाइपलाइन वेक्टर असेंबलर अन्य कॉलम ड्रॉप ) देखें

उदाहरण के लिए गोल्ब डेटासेट पर काम करता है और निम्नलिखित प्रीप्रोकैसिंग चरण आवश्यक हैं:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

जैसा कि मैं स्पार्क के लिए नया हूं, मुझे यकीन नहीं है कि इस मुद्दे को हल करने का सबसे अच्छा तरीका क्या होगा। क्या आप सुझाव देंगे ...

  1. एक नया ट्रांसफॉर्मर बनाने के लिए, जो कॉलम छोड़ देता है और जिसे पाइपलाइन में शामिल किया जा सकता है?
  2. दोनों पाइपलाइनों को विभाजित करें और मध्यस्थ कदम पेश करें
  3. और कुछ? :)

या क्या मुझे कुछ महत्वपूर्ण (पाइपलाइन कदम, पीआर, इत्यादि) याद आ रही है जो इस मुद्दे को हल करेगी?

संपादित करें:

मैंने एक नया ट्रांसफार्मर DroppingVectorAssembler लागू किया, जो अनावश्यक कॉलम छोड़ देता है, हालांकि, वही अपवाद फेंक दिया जाता है।

इसके अलावा, spark.sql.codegen.wholeStage को false सेट करने से समस्या हल नहीं होती है।


janino त्रुटि आप प्राप्त कर रहे हैं वह है क्योंकि सुविधा सेट के आधार पर, जेनरेट कोड बड़ा हो जाता है।

मैं चरणों को अलग-अलग पाइपलाइनों में अलग करता हूं और अनावश्यक सुविधाओं को छोड़ देता StringIndexer , StringIndexer और OneHotEncoder जैसे इंटरमीडिएट मॉडल को StringIndexer OneHotEncoder और भविष्यवाणी चरण के दौरान उन्हें लोड करता हूं, जो सहायक भी है क्योंकि डेटा की भविष्यवाणी के लिए रूपांतरण तेज होगा।

अंत में, आपको feature vector VectorAssembler चरण चलाने के बाद फीचर कॉलम रखने की आवश्यकता नहीं है क्योंकि यह feature vector और label कॉलम में सुविधाओं को बदल देती है और यही आपको भविष्यवाणियों को चलाने की ज़रूरत है।

इंटरमीडिएट चरणों की बचत के साथ स्कैला में पाइपलाइन का उदाहरण- (पुराना स्पार्क एपीआई)

साथ ही, यदि आप 1.6.0 की तरह स्पार्क के पुराने संस्करण का उपयोग कर रहे हैं, तो आपको पैच किए गए संस्करण यानी 2.1.1 या 2.2.0 या 1.6.4 की जांच करने की आवश्यकता है या अन्यथा आप 400 फीचर कॉलम के साथ भी Janino त्रुटि को Janino