apache spark - شرارة-نافذة مع العودية؟-نشر القيم بشكل مشروط عبر الصفوف



apache-spark pyspark (1)

وظائف نافذة لا تدعم العودية ولكن ليس مطلوبا هنا. هذا النوع من سيسيونيزاتيون يمكن التعامل معها بسهولة مع المجموع التراكمي:

from pyspark.sql.functions import col, sum, when, lag
from pyspark.sql.window import Window

w = Window.partitionBy("user_id").orderBy("visit_id")
purch_id = sum(lag(when(
    col("revenue") > 0, 1).otherwise(0), 
    1, 0
).over(w)).over(w) + 1

df.withColumn("purch_id", purch_id).show()
+-------+--------+-------+--------+
|user_id|visit_id|revenue|purch_id|
+-------+--------+-------+--------+
|      1|       1|      0|       1|
|      1|       2|      0|       1|
|      1|       3|      0|       1|
|      1|       4|    100|       1|
|      1|       5|      0|       2|
|      1|       6|      0|       2|
|      1|       7|    200|       2|
|      1|       8|      0|       3|
|      1|       9|     10|       3|
+-------+--------+-------+--------+

لدي داتافريم التالية التي تبين إيرادات المشتريات.

+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
|      1|       1|      0|
|      1|       2|      0|
|      1|       3|      0|
|      1|       4|    100|
|      1|       5|      0|
|      1|       6|      0|
|      1|       7|    200|
|      1|       8|      0|
|      1|       9|     10|
+-------+--------+-------+

في نهاية المطاف، أرغب في أن purch_revenue عمود purch_revenue العمود الجديد الأرباح الناتجة عن عملية الشراء في كل صف. وكحل بديل، حاولت أيضا تقديم معرف الشراء purch_id الذي يتزايد في كل مرة يتم فيها إجراء عملية شراء. لذلك يتم سرد هذا فقط كمرجع.

+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
|      1|       1|      0|          100|       1|
|      1|       2|      0|          100|       1|
|      1|       3|      0|          100|       1|
|      1|       4|    100|          100|       1|
|      1|       5|      0|          100|       2|
|      1|       6|      0|          100|       2|
|      1|       7|    200|          100|       2|
|      1|       8|      0|          100|       3|
|      1|       9|     10|          100|       3|
+-------+--------+-------+-------------+--------+

لقد حاولت استخدام وظيفة lag/lead مثل هذا:

user_timeline = Window.partitionBy("user_id").orderBy("visit_id")
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))\ 
  .otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline))
df.withColumn("purch_revenue", find_rev)

يؤدي هذا إلى تكرار عمود الأرباح إذا كانت revenue > 0 ، كما أنه يسحبه بمقدار صف واحد. ومن الواضح أنني يمكن أن سلسلة هذا ل N محدود، ولكن هذا ليس حلا.

  • هل هناك طريقة لتطبيق هذا بشكل متكرر حتى revenue > 0 ؟
  • بدلا من ذلك، هل هناك طريقة لزيادة قيمة استنادا إلى شرط؟ لقد حاولت معرفة طريقة للقيام بذلك ولكن ناضلت للعثور على واحد.




pyspark-sql