apache spark - TypeError: العمود غير قابل للتكرار-كيفية التكرار عبر ArrayType()؟




apache-spark pyspark (2)

خذ بعين الاعتبار DataFrame التالية:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

والتي يمكن إنشاؤها باستخدام الكود التالي:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)

هل هناك طريقة لتعديل "names" عمود ArrayType() مباشرةً عن طريق تطبيق دالة على كل عنصر ، دون استخدام udf ؟

على سبيل المثال ، افترض أنني أردت تطبيق الدالة foo على عمود "names" . ( str.upper المثال حيث تكون foo str.upper فقط لأغراض التوضيح ، لكن سؤالي يتعلق بأي وظيفة صالحة يمكن تطبيقها على عناصر قابلة للتكرار.)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()

TypeError: العمود غير قابل للتكرار

يمكنني القيام بذلك باستخدام udf :

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

في هذا المثال بالذات ، يمكنني تجنب udf عن طريق انفجار العمود ، والاتصال بـ pyspark.sql.functions.upper() ، ثم groupBy و collect_list :

df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

ولكن هذا كثير من التعليمات البرمجية للقيام بشيء بسيط. هل هناك طريقة أكثر مباشرة للتكرار على عناصر ArrayType() باستخدام دالات شرارة dataframe؟


في Spark <2.4 ، يمكنك استخدام دالة معرفة من قبل المستخدم:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType

def transform(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @udf(ArrayType(t))
    def _(xs):
        if xs is not None:
            return [f(x) for x in xs]
    return _

foo_udf = transform(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

بالنظر إلى التكلفة العالية collect_list explode + collect_list ، يُفضل هذا النهج على وجه الحصر تقريبًا ، على الرغم من تكلفته الأساسية.

في Spark 2.4 أو الأحدث ، يمكنك استخدام transform * مع upper (انظر SPARK-23909 ):

from pyspark.sql.functions import expr

df.withColumn(
    'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

من الممكن أيضًا استخدام pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType

def transform_pandas(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
    return _

foo_udf_pandas = transform_pandas(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

على الرغم من أن أحدث مجموعات Arrow / PySpark تدعم التعامل مع أعمدة ArrayType ( SPARK-24259 ، SPARK-21187 ). ومع ذلك ، يجب أن يكون هذا الخيار أكثر كفاءة من UDF القياسي (خاصةً مع انخفاض النفقات العامة) مع دعم وظائف Python التعسفية.

* يتم أيضًا دعم عدد من وظائف الترتيب العالي الأخرى ، بما في ذلك ، على سبيل المثال لا الحصر ، filter aggregate . انظر على سبيل المثال

  • الاستعلام عن شرارة SQL DataFrame مع أنواع معقدة
  • كيفية شريحة ومجموع عناصر عمود الصفيف؟
  • تصفية محتوى عمود الصفيف
  • شرارة سكالا المتوسط ​​الحكيم من خلال التعامل مع فارغة .
  • كيفية استخدام تحويل وظيفة ترتيب أعلى؟ .

نعم يمكنك القيام بذلك عن طريق تحويله إلى RDD ثم العودة إلى DF.

>>> df.show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

>>> df.rdd.mapValues(lambda x: [y.upper() for y in x]).toDF(["type","names"]).show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+




pyspark-sql