apache spark - شرارة-نافذة مع العودية؟-نشر القيم بشكل مشروط عبر الصفوف



apache-spark pyspark (1)

لدي داتافريم التالية التي تبين إيرادات المشتريات.

+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
|      1|       1|      0|
|      1|       2|      0|
|      1|       3|      0|
|      1|       4|    100|
|      1|       5|      0|
|      1|       6|      0|
|      1|       7|    200|
|      1|       8|      0|
|      1|       9|     10|
+-------+--------+-------+

في نهاية المطاف، أرغب في أن purch_revenue عمود purch_revenue العمود الجديد الأرباح الناتجة عن عملية الشراء في كل صف. وكحل بديل، حاولت أيضا تقديم معرف الشراء purch_id الذي يتزايد في كل مرة يتم فيها إجراء عملية شراء. لذلك يتم سرد هذا فقط كمرجع.

+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
|      1|       1|      0|          100|       1|
|      1|       2|      0|          100|       1|
|      1|       3|      0|          100|       1|
|      1|       4|    100|          100|       1|
|      1|       5|      0|          100|       2|
|      1|       6|      0|          100|       2|
|      1|       7|    200|          100|       2|
|      1|       8|      0|          100|       3|
|      1|       9|     10|          100|       3|
+-------+--------+-------+-------------+--------+

لقد حاولت استخدام وظيفة lag/lead مثل هذا:

user_timeline = Window.partitionBy("user_id").orderBy("visit_id")
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))\ 
  .otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline))
df.withColumn("purch_revenue", find_rev)

يؤدي هذا إلى تكرار عمود الأرباح إذا كانت revenue > 0 ، كما أنه يسحبه بمقدار صف واحد. ومن الواضح أنني يمكن أن سلسلة هذا ل N محدود، ولكن هذا ليس حلا.

  • هل هناك طريقة لتطبيق هذا بشكل متكرر حتى revenue > 0 ؟
  • بدلا من ذلك، هل هناك طريقة لزيادة قيمة استنادا إلى شرط؟ لقد حاولت معرفة طريقة للقيام بذلك ولكن ناضلت للعثور على واحد.

وظائف نافذة لا تدعم العودية ولكن ليس مطلوبا هنا. هذا النوع من سيسيونيزاتيون يمكن التعامل معها بسهولة مع المجموع التراكمي:

from pyspark.sql.functions import col, sum, when, lag
from pyspark.sql.window import Window

w = Window.partitionBy("user_id").orderBy("visit_id")
purch_id = sum(lag(when(
    col("revenue") > 0, 1).otherwise(0), 
    1, 0
).over(w)).over(w) + 1

df.withColumn("purch_id", purch_id).show()
+-------+--------+-------+--------+
|user_id|visit_id|revenue|purch_id|
+-------+--------+-------+--------+
|      1|       1|      0|       1|
|      1|       2|      0|       1|
|      1|       3|      0|       1|
|      1|       4|    100|       1|
|      1|       5|      0|       2|
|      1|       6|      0|       2|
|      1|       7|    200|       2|
|      1|       8|      0|       3|
|      1|       9|     10|       3|
+-------+--------+-------+--------+




pyspark-sql