scala - स्पार्क डेटाफ्रेम: इंडेक्स कॉलम कैसे जोड़ें: एका डिस्ट्रिब्यूटेड डेटा इंडेक्स




apache-spark dataframe (4)

मैं एक सीएसवी फ़ाइल से डेटा पढ़ता हूं, लेकिन इसमें इंडेक्स नहीं है।

मैं 1 से पंक्ति की संख्या में एक कॉलम जोड़ना चाहता हूं।

मुझे क्या करना चाहिए, धन्यवाद (scala)


जैसा कि राम ने कहा, zippedwithindex , monotonically बढ़ती आईडी से बेहतर है, आईडी आपको लगातार पंक्ति संख्या की आवश्यकता है। इसे आज़माएं (PySpark पर्यावरण):

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

जहाँ ओरिजिनल_डॉटफ़्रेम वह डेटाफ़्रेम है जिसमें आपको इंडेक्स को जोड़ना होता है और row_with_index कॉलम इंडेक्स के साथ नया स्कीमा होता है जिसे आप इस प्रकार लिख सकते हैं

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

यहाँ, year_week_number , year_week_number , year_period_number और realization मेरे year_period_number के कॉलम थे। आप अपने कॉलम के नामों के साथ नाम बदल सकते हैं। index नया कॉलम नाम है जिसे आपको पंक्ति संख्याओं के लिए जोड़ना था।


स्काला के साथ आप उपयोग कर सकते हैं:

import org.apache.spark.sql.functions._ 

df.withColumn("id",monotonicallyIncreasingId)

आप इस छूट और स्काला docs उल्लेख कर सकते हैं।

Pyspark के साथ आप उपयोग कर सकते हैं:

from pyspark.sql.functions import monotonically_increasing_id 

df_index = df.select("*").withColumn("id", monotonically_increasing_id())

अनुक्रमिक आईडी कॉलम कैसे प्राप्त करें :

from pyspark.sql.functions import desc, row_number, monotonically_increasing_id

df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

ध्यान दें कि row_number () 1 से शुरू होता है, इसलिए यदि आप 0-अनुक्रमित कॉलम चाहते हैं तो 1 से घटाएं


नोट : ऊपर दिए गए दृष्टिकोण अनुक्रम संख्या नहीं देते हैं, लेकिन यह बढ़ती आईडी देते हैं।

ऐसा करने का सरल तरीका और अनुक्रमित का क्रम सुनिश्चित करना नीचे की तरह है .. zipWithIndex

नमूना डेटा।

+-------------------+
|               Name|
+-------------------+
|     Ram Ghadiyaram|
|        Ravichandra|
|              ilker|
|               nick|
|             Naveed|
|      Gobinathan SP|
|Sreenivas Venigalla|
|     Jackela Kowski|
|   Arindam Sengupta|
|            Liangpi|
|             Omar14|
|        anshu kumar|
+-------------------+
    package com.example

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}

/**
  * DistributedDataIndex : Program to index an RDD  with
  */
object DistributedDataIndex extends App with Logging {

  val spark = builder
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  val df = spark.sparkContext.parallelize(
    Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
      , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
    )).toDF("Name")
  df.show
  logInfo("addColumnIndex here")
  // Add index now...
  val df1WithIndex = addColumnIndex(df)
    .withColumn("monotonically_increasing_id", monotonically_increasing_id)
  df1WithIndex.show(false)

  /**
    * Add Column Index to dataframe
    */
  def addColumnIndex(df: DataFrame) = {
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  }
}

परिणाम :

+-------------------+-----+---------------------------+
|Name               |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram     |0    |0                          |
|Ravichandra        |1    |8589934592                 |
|ilker              |2    |8589934593                 |
|nick               |3    |17179869184                |
|Naveed             |4    |25769803776                |
|Gobinathan SP      |5    |25769803777                |
|Sreenivas Venigalla|6    |34359738368                |
|Jackela Kowski     |7    |42949672960                |
|Arindam Sengupta   |8    |42949672961                |
|Liangpi            |9    |51539607552                |
|Omar14             |10   |60129542144                |
|anshu kumar        |11   |60129542145                |
+-------------------+-----+---------------------------+




apache-spark-sql