python pass PySpark row-wise function composition




spark dataframe apply function to each row (2)

As a simplified example, I have a dataframe "df" with columns "col1,col2" and I want to compute a row-wise maximum after applying a function to each column :

def f(x):
    return (x+1)

max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

So if df:

col1   col2
1      2
3      0

Then

df2:

col1   col2  result
1      2     3
3      0     4

The above doesn't seem to work and produces "Cannot evaluate expression: PythonUDF#f..."

I'm absolutely positive "f_udf" works just fine on my table, and the main issue is with the max_udf.

Without creating extra columns or using basic map/reduce, is there a way to do the above entirely using dataframes and udfs? How should I modify "max_udf"?

I've also tried:

max_udf=udf(max, IntegerType())

which produces the same error.

I've also confirmed that the following works:

df2=(df.withColumn("temp1", f_udf(df.col1))
       .withColumn("temp2", f_udf(df.col2))

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

Why is it that I can't do these in one go?

I would like to see an answer that generalizes to any function "f_udf" and "max_udf."


UserDefinedFunction is throwing error while accepting UDFs as their arguments.

You can modify the max_udf like below to make it work.

df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"])

max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType())

df2 = df.withColumn("result", max_udf(df.col1, df.col2))

Or

def f_udf(x):
    return (x + 1)

max_udf = udf(lambda x, y: max(x, y), IntegerType())
## f_udf=udf(f, IntegerType())

df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2)))

Note:

The second approach is valid if and only if internal functions (here f_udf) generate valid SQL expressions.

It works here because f_udf(df.col1) and f_udf(df.col2) are evaluated as Column<b'(col1 + 1)'> and Column<b'(col2 + 1)'> respectively, before being passed to max_udf. It wouldn't work with arbitrary function.

It wouldn't work if we try for example something like this:

from math import exp

df.withColumn("result", max_udf(exp(df.col1), exp(df.col2)))

I had a similar problem and found the solution in the answer to this question

To pass multiple columns or a whole row to an UDF use a struct:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))

new_df.show()

returns:

+----+----+----------+
|   a|   b|null_count|
+----+----+----------+
|null|null|         2|
|   1|null|         1|
|null|   2|         1|
+----+----+----------+




apache-spark-sql