apache-spark - snippet - title tag wordpress




Wie verwende ich die Fensterspezifikation und verknüpfe die Bedingung pro Spaltenwert? (2)

Da Sie mit den drei Primärschlüsseln "OrganizationId", "AnnualPeriodId", "InterimPeriodId" mit der Funktion Window bereits die letzten Zeilen aus Ihrem DF2 ausgewählt haben, verfügen Sie bereits über rows zum Einfügen mit diesen drei Primärschlüsseln. Sie müssen nur rows mit I und O in der Spalte FFAction_1 .

Und wie Sie sagten, Sie möchten die rows von DF1 löschen, wenn FFAction_1 = O oder D mit zwei Primärschlüssel "OrganizationId","InterimPeriodId" . Auch hierfür müssen Sie rows aus DF2 mit D oder O in der Spalte FFAction_1 zum Löschen FFAction_1 .

Nachdem Sie DF2 Verwendung der oben genannten Logik getrennt haben, wäre der erste Schritt das Löschen von rows aus DF1 durch join . Und dann fügen Sie die anderen gefilterten Zeilen von DF2 mit union .

Hier ist die komplette Lösung

//------------------------------- filtering only the latest from increamental ------------------------------
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


//-----------------separating the increamental df for insert, deletion and overwrite----------------

//---------------insert rows are selected -------------------------------
//insert a row if I is detected and if O is found then first delete and then insert
val insertdf = latestForEachKey.filter($"FFAction" === "I|!|" || $"FFAction" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

//------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
// delete rows from parent if both D or O is found in increamental
val deletedf = latestForEachKey.filter($"FFAction" === "D|!|" || $"FFAction" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

//join by two primary keys for deletion and delete from the parent dataframe
val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left")
  .filter($"Delete".isNull)
  .drop("Delete")

//final required output 
dfMainOutput.union(insertdf).show(false)

Hier ist mein DF1

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction
4295858898|^|204|^|205|^|1|^|I|!|
4295858898|^|204|^|208|^|2|^|I|!|
4295858898|^|204|^|209|^|2|^|I|!|
4295858898|^|204|^|211|^|3|^|I|!|
4295858898|^|204|^|212|^|3|^|I|!|
4295858898|^|204|^|214|^|4|^|I|!|
4295858898|^|204|^|215|^|4|^|I|!|
4295858898|^|206|^|207|^|1|^|I|!|
4295858898|^|206|^|210|^|2|^|I|!|
4295858898|^|206|^|213|^|3|^|I|!|

Hier ist mein DF2

   DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
  SelfSourcedPublic|^|2002|^|1511224917595|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917596|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917597|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917598|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917599|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917600|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917601|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917602|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917603|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917604|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917605|^|4295858941|^|1|^|2|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917606|^|4295858941|^|1|^|3|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917607|^|4295858941|^|5|^|6|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917608|^|4295858941|^|5|^|7|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917609|^|4295858941|^|12|^|10|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917610|^|4295858941|^|12|^|11|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917611|^|4295858941|^|1|^|13|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917612|^|4295858941|^|12|^|14|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917613|^|4295858941|^|5|^|15|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917614|^|4295858941|^|5|^|16|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917615|^|4295858941|^|1|^|17|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917616|^|4295858941|^|1|^|18|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917617|^|4295858941|^|5|^|19|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917618|^|4295858941|^|5|^|20|^|2|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917619|^|4295858941|^|5|^|21|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917620|^|4295858941|^|1|^|22|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917621|^|4295858941|^|1|^|23|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1511224917622|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1511224917642|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917643|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917644|^|4295858941|^|null|^|37|^|null|^|D|!|

Ich möchte Join basierend auf dem Wert der Spalte implementieren.

Dies versuche ich beispielsweise in Spark-Scala zu erreichen, weiß aber nicht, wie ich es umsetzen soll

Wenn die FFAction_1 =I in der DF2 dann unter Bedingung

(Join und partitionBy auf drei Spalten "OrganizationId", "AnnualPeriodId","InterimPeriodId" )

val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
  .filter(!$"FFAction".contains("D"))

Wenn der FFAction_1 =O or D dann unter Bedingung

(Join und partitionBy auf zwei Spalten "OrganizationId","InterimPeriodId" )

val windowSpec = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
   .filter(!$"FFAction".contains("D"))

Unten ist mein vollständiger Code

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4))

val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN")
val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name))


//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR")
val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


 //------------------------------- filtering only the latest from increamental ------------------------------

    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")


    val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

//-----------------separating the increamental df for insert, deletion and overwrite----------------

    //---------------insert rows are selected -------------------------------
    //insert a row if I is detected and if O is found then first delete and then insert

    val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

    //------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
    // delete rows from parent if both D or O is found in increamental
    val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

    //join by two primary keys for deletion and delete from the parent dataframe
    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|")))

val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)


    dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/Interim2Annual/output")

   val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/Interim2Annual/Descr")

DISCLAIMER Irgendwie scheinen diese und die andere Frage, die ich gerade beantwortet habe, Duplikate zu sein, so dass man bald als solche markiert wird, oder wir finden den Unterschied zwischen ihnen und der Haftungsausschluss verschwindet. Wir werden sehen.

Angesichts der Anforderung, die finale Fensterspezifikation und die Join-Bedingung basierend auf den Werten der Spalte FFAction_1 , würde ich zuerst filter und entscheiden, welche Fensteraggregation und Joins verwendet werden sollen.

val df1 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df1.csv").
  select("OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber", "FFAction")
scala> df1.show
+--------------+--------------+---------------+-------------+--------+
|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber|FFAction|
+--------------+--------------+---------------+-------------+--------+
|    4295858898|           204|            205|            1|       I|
|    4295858898|           204|            208|            2|       I|
|    4295858898|           204|            209|            2|       I|
|    4295858898|           204|            211|            3|       I|
|    4295858898|           204|            212|            3|       I|
|    4295858898|           204|            214|            4|       I|
|    4295858898|           204|            215|            4|       I|
|    4295858898|           206|            207|            1|       I|
|    4295858898|           206|            210|            2|       I|
|    4295858898|           206|            213|            3|       I|
+--------------+--------------+---------------+-------------+--------+

Die rechte Seite der Verbindung ist in "Form" ziemlich ähnlich.

val df2 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df2.csv").
  select("DataPartition_1", "PartitionYear_1", "TimeStamp", "OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber_1", "FFAction_1")
scala> df2.show
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|  DataPartition_1|PartitionYear_1|    TimeStamp|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber_1|FFAction_1|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|SelfSourcedPublic|           2002|1510725106270|    4295858941|            24|             25|              4|         O|
|SelfSourcedPublic|           2002|1510725106271|    4295858941|            24|             25|              5|         O|
|SelfSourcedPublic|           2003|1510725106272|    4295858941|            30|             31|              2|         O|
|SelfSourcedPublic|           2003|1510725106273|    4295858941|            30|             31|              3|         O|
|SelfSourcedPublic|           2001|1510725106293|    4295858941|             5|             20|              2|         O|
|SelfSourcedPublic|           2001|1510725106294|    4295858941|             5|             21|              3|         O|
|SelfSourcedPublic|           2002|1510725106295|    4295858941|             1|             22|              4|         O|
|SelfSourcedPublic|           2002|1510725106296|    4295858941|             1|             23|              5|         O|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         I|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         D|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+

Mit den obigen Datensätzen würde ich filter , um zu sehen, ob es mindestens ein I in df2 in der Spalte FFAction_1 , und die richtige Fensterspezifikation und Join-Bedingung auswählen.

Der Trick besteht darin, join Operator join gefolgt von where Operator where (oder filter ) zu verwenden, damit Sie entscheiden können, welche Join-Bedingung verwendet werden soll.

val noIs = df2.filter($"FFAction_1" === "I").take(1).isEmpty
val (windowSpec, joinCond) = if (noIs) {
  (windowSpecForOs, joinForOs) 
} else {
  (windowSpecForIs, joinForIs)
}
val latestForEachKey = df2result.withColumn("rank", rank() over windowSpec)
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey).where(joinCond)




spark-dataframe