python - collect_list عن طريق الحفاظ على النظام بناءً على متغير آخر




apache-spark pyspark (3)

أحاول إنشاء عمود جديد من القوائم في Pyspark باستخدام تجميع تجميعي على مجموعة الأعمدة الحالية. يتم توفير مثال لإطار إدخال البيانات أدناه:

------------------------
id | date        | value
------------------------
1  |2014-01-03   | 10 
1  |2014-01-04   | 5
1  |2014-01-05   | 15
1  |2014-01-06   | 20
2  |2014-02-10   | 100   
2  |2014-03-11   | 500
2  |2014-04-15   | 1500

الناتج المتوقع هو:

id | value_list
------------------------
1  | [10, 5, 15, 20]
2  | [100, 500, 1500]

يتم فرز القيم داخل القائمة حسب التاريخ.

لقد حاولت استخدام collect_list على النحو التالي:

from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))

لكن collect_list لا يضمن الطلب حتى لو قمت بفرز إطار بيانات الإدخال حسب التاريخ قبل التجميع.

هل يمكن لشخص ما المساعدة في كيفية القيام بالتجميع عن طريق الحفاظ على الترتيب بناءً على متغير (تاريخ) ثانٍ؟


إذا قمت بجمع كل من التواريخ والقيم كقائمة ، يمكنك فرز العمود الناتج وفقًا للتاريخ باستخدام و udf ، ثم الاحتفاظ فقط بالقيم في النتيجة.

import operator
import pyspark.sql.functions as F

# create list column
grouped_df = input_df.groupby("id") \
               .agg(F.collect_list(F.struct("date", "value")) \
               .alias("list_col"))

# define udf
def sorter(l):
  res = sorted(l, key=operator.itemgetter(0))
  return [item[1] for item in res]

sort_udf = F.udf(sorter)

# test
grouped_df.select("id", sort_udf("list_col") \
  .alias("sorted_list")) \
  .show(truncate = False)
+---+----------------+
|id |sorted_list     |
+---+----------------+
|1  |[10, 5, 15, 20] |
|2  |[100, 500, 1500]|
+---+----------------+

كان السؤال يتعلق بـ PySpark ولكن قد يكون من المفيد الحصول عليه أيضًا مع Scala Spark.

دعونا نستعد اختبار قاعدة البيانات:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{ Window, UserDefinedFunction}

import java.sql.Date
import java.time.LocalDate

val spark: SparkSession = ...

// Out test data set
val data: Seq[(Int, Date, Int)] = Seq(
  (1, Date.valueOf(LocalDate.parse("2014-01-03")), 10),
  (1, Date.valueOf(LocalDate.parse("2014-01-04")), 5),
  (1, Date.valueOf(LocalDate.parse("2014-01-05")), 15),
  (1, Date.valueOf(LocalDate.parse("2014-01-06")), 20),
  (2, Date.valueOf(LocalDate.parse("2014-02-10")), 100),
  (2, Date.valueOf(LocalDate.parse("2014-02-11")), 500),
  (2, Date.valueOf(LocalDate.parse("2014-02-15")), 1500)
)

// Create dataframe
val df: DataFrame = spark.createDataFrame(data)
  .toDF("id", "date", "value")
df.show()
//+---+----------+-----+
//| id|      date|value|
//+---+----------+-----+
//|  1|2014-01-03|   10|
//|  1|2014-01-04|    5|
//|  1|2014-01-05|   15|
//|  1|2014-01-06|   20|
//|  2|2014-02-10|  100|
//|  2|2014-02-11|  500|
//|  2|2014-02-15| 1500|
//+---+----------+-----+

استخدم UDF

// Group by id and aggregate date and value to new column date_value
val grouped = df.groupBy(col("id"))
  .agg(collect_list(struct("date", "value")) as "date_value")
grouped.show()
grouped.printSchema()
// +---+--------------------+
// | id|          date_value|
// +---+--------------------+
// |  1|[[2014-01-03,10],...|
// |  2|[[2014-02-10,100]...|
// +---+--------------------+

// udf to extract data from Row, sort by needed column (date) and return value
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
  rows.map { case Row(date: Date, value: Int) => (date, value) }
    .sortBy { case (date, value) => date }
    .map { case (date, value) => value }
})

// Select id and value_list
val r1 = grouped.select(col("id"), sortUdf(col("date_value")).alias("value_list"))
r1.show()
// +---+----------------+
// | id|      value_list|
// +---+----------------+
// |  1| [10, 5, 15, 20]|
// |  2|[100, 500, 1500]|
// +---+----------------+

استخدام النافذة

val window = Window.partitionBy(col("id")).orderBy(col("date"))
val sortedDf = df.withColumn("values_sorted_by_date", collect_list("value").over(window))
sortedDf.show()
//+---+----------+-----+---------------------+
//| id|      date|value|values_sorted_by_date|
//+---+----------+-----+---------------------+
//|  1|2014-01-03|   10|                 [10]|
//|  1|2014-01-04|    5|              [10, 5]|
//|  1|2014-01-05|   15|          [10, 5, 15]|
//|  1|2014-01-06|   20|      [10, 5, 15, 20]|
//|  2|2014-02-10|  100|                [100]|
//|  2|2014-02-11|  500|           [100, 500]|
//|  2|2014-02-15| 1500|     [100, 500, 1500]|
//+---+----------+-----+---------------------+

val r2 = sortedDf.groupBy(col("id"))
  .agg(max("values_sorted_by_date").as("value_list")) 
r2.show()
//+---+----------------+
//| id|      value_list|
//+---+----------------+
//|  1| [10, 5, 15, 20]|
//|  2|[100, 500, 1500]|
//+---+----------------+

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('id').orderBy('date')

sorted_list_df = input_df.withColumn(
            'sorted_list', F.collect_list('value').over(w)
        )\
        .groupBy('id')\
        .agg(F.max('sorted_list').alias('sorted_list'))

Window كثير من الأحيان لا تفسر أمثلة Window التي يقدمها المستخدمون ما يجري ، لذلك اسمح لي بتشريحه نيابة عنك.

كما تعلمون ، فإن استخدام collect_list مع groupBy سينتج عنه قائمة غير مرتبة من القيم. هذا لأنه بناءً على كيفية تقسيم بياناتك ، ستقوم Spark بإلحاق القيم بقائمتك بمجرد العثور على صف في المجموعة. ثم يعتمد الأمر على كيفية تخطيط Spark لتجميعك على المنفذين.

تسمح لك وظيفة Window بالتحكم في هذا الموقف ، وتجميع الصفوف حسب قيمة معينة حتى تتمكن من إجراء عملية over كل مجموعة من المجموعات الناتجة:

w = Window.partitionBy('id').orderBy('date')
  • partitionBy - تريد مجموعات / أقسام من الصفوف بنفس id
  • orderBy - تريد فرز كل صف في المجموعة حسب date

بمجرد تحديد نطاق نافذتك - "الصفوف ذات id نفسه ، مرتبة حسب date " - ، يمكنك استخدامه لإجراء عملية فوقها ، وفي هذه الحالة ، قائمة collect_list :

F.collect_list('value').over(w)

في هذه المرحلة ، أنشأت عمودًا جديدًا sorted_list مع قائمة قيم مرتبة مرتبة حسب التاريخ ، ولكن لا يزال لديك صفوف مكررة لكل id . لتقليص الصفوف المكررة التي تريد groupBy id والحفاظ على القيمة max لكل مجموعة:

.groupBy('id')\
.agg(F.max('sorted_list').alias('sorted_list'))




pyspark