python - بيسبارك: حساب الصف الأقصى من مجموعة فرعية من الأعمدة وإضافة إلى داتافريم القائمة



apache-spark pyspark (1)

أود حساب الحد الأقصى لمجموعة فرعية من الأعمدة لكل صف وإضافته كعمود جديد ل Dataframe الحالية.

تمكنت من القيام بذلك بطريقة محرجة للغاية:

def add_colmax(df,subset_columns,colnm):
     '''
     calculate the maximum of the selected "subset_columns" from dataframe df for each row, 
     new column containing row wise maximum is added to dataframe df. 

     df: dataframe. It must contain subset_columns as subset of columns
     colnm: Name of the new column containing row-wise maximum of subset_columns
     subset_columns: the subset of columns from w
     '''
     from pyspark.sql.functions import monotonicallyIncreasingId
     from pyspark.sql import Row
     def get_max_row_with_None(row):
         return float(np.max(row))

     df_subset = df.select(subset_columns)
     rdd = df_subset.map( get_max_row_with_None)
     df_rowsum = rdd.map(Row(colnm)).toDF()
     df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
     df = df.withColumn("id",monotonicallyIncreasingId())
     df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
     return df

تعمل هذه الوظيفة على النحو التالي:

rdd1 =  sc.parallelize([("foo", 1.0,3.0,None), 
                    ("bar", 2.0,2.0,-10), 
                    ("baz", 3.3,1.2,10.0)])


df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")   
df_new.collect()

عائدات:

 [Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
  Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
  Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]

وأعتقد أنه إذا كنت يمكن أن تستخدم وظائف تعريف المستخدم مع withColumn ، وهذا يمكن أن يتم أبسط من ذلك بكثير. ولكن لم أستطع معرفة كيفية القيام بذلك. واسمحوا لي أن أعرف إذا كان لديك طريقة أبسط لتحقيق ذلك. أنا باستخدام شرارة 1.6


دعونا نبدأ مع اثنين من الواردات

from pyspark.sql.functions import col, lit, coalesce, greatest

التالي تعريف ناقص اللانهاية الحرفية:

minf = lit(float("-inf"))

خريطة الأعمدة وتمرير النتيجة إلى greatest :

rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])

وأخيرا مع withColumn :

df1.withColumn("rowmax", rowmax)

مع النتيجة:

+---+---+---+----+------+
| v1| v2| v3|  v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null|   3.0|
|bar|2.0|2.0| -10|   2.0|
|baz|3.3|1.2|null|   3.3|
+---+---+---+----+------+

يمكنك استخدام نفس النمط مع صف مختلفة العمليات الحكيمة استبدال minf مع عنصر محايد. فمثلا:

rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])

أو:

from operator import mul
from functools import reduce

rowproduct = reduce(
  mul, 
  [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
)

يمكن تبسيط التعليمات البرمجية الخاصة بك بشكل كبير مع udf :

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def get_max_row_with_None_(*cols):
    return float(max(x for x in cols if x is not None))

get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))

استبدال minf مع lit(float("inf")) greatest مع least للحصول على أصغر قيمة لكل صف.





pyspark-sql