python - بيسبارك: حساب الصف الأقصى من مجموعة فرعية من الأعمدة وإضافة إلى داتافريم القائمة
apache-spark pyspark (1)
أود حساب الحد الأقصى لمجموعة فرعية من الأعمدة لكل صف وإضافته كعمود جديد ل Dataframe
الحالية.
تمكنت من القيام بذلك بطريقة محرجة للغاية:
def add_colmax(df,subset_columns,colnm):
'''
calculate the maximum of the selected "subset_columns" from dataframe df for each row,
new column containing row wise maximum is added to dataframe df.
df: dataframe. It must contain subset_columns as subset of columns
colnm: Name of the new column containing row-wise maximum of subset_columns
subset_columns: the subset of columns from w
'''
from pyspark.sql.functions import monotonicallyIncreasingId
from pyspark.sql import Row
def get_max_row_with_None(row):
return float(np.max(row))
df_subset = df.select(subset_columns)
rdd = df_subset.map( get_max_row_with_None)
df_rowsum = rdd.map(Row(colnm)).toDF()
df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
df = df.withColumn("id",monotonicallyIncreasingId())
df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
return df
تعمل هذه الوظيفة على النحو التالي:
rdd1 = sc.parallelize([("foo", 1.0,3.0,None),
("bar", 2.0,2.0,-10),
("baz", 3.3,1.2,10.0)])
df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")
df_new.collect()
عائدات:
[Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]
وأعتقد أنه إذا كنت يمكن أن تستخدم وظائف تعريف المستخدم مع withColumn
، وهذا يمكن أن يتم أبسط من ذلك بكثير. ولكن لم أستطع معرفة كيفية القيام بذلك. واسمحوا لي أن أعرف إذا كان لديك طريقة أبسط لتحقيق ذلك. أنا باستخدام شرارة 1.6
دعونا نبدأ مع اثنين من الواردات
from pyspark.sql.functions import col, lit, coalesce, greatest
التالي تعريف ناقص اللانهاية الحرفية:
minf = lit(float("-inf"))
خريطة الأعمدة وتمرير النتيجة إلى greatest
:
rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])
وأخيرا مع withColumn
:
df1.withColumn("rowmax", rowmax)
مع النتيجة:
+---+---+---+----+------+
| v1| v2| v3| v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null| 3.0|
|bar|2.0|2.0| -10| 2.0|
|baz|3.3|1.2|null| 3.3|
+---+---+---+----+------+
يمكنك استخدام نفس النمط مع صف مختلفة العمليات الحكيمة استبدال minf
مع عنصر محايد. فمثلا:
rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])
أو:
from operator import mul
from functools import reduce
rowproduct = reduce(
mul,
[coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
)
يمكن تبسيط التعليمات البرمجية الخاصة بك بشكل كبير مع udf
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
def get_max_row_with_None_(*cols):
return float(max(x for x in cols if x is not None))
get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))
استبدال minf
مع lit(float("inf"))
greatest
مع least
للحصول على أصغر قيمة لكل صف.