apache spark - TypeError: العمود غير قابل للتكرار-كيفية التكرار عبر ArrayType()؟
apache-spark pyspark (2)
خذ بعين الاعتبار DataFrame التالية:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
والتي يمكن إنشاؤها باستخدام الكود التالي:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
هل هناك طريقة لتعديل
"names"
عمود
ArrayType()
مباشرةً عن طريق تطبيق دالة على كل عنصر ، دون استخدام
udf
؟
على سبيل المثال ، افترض أنني أردت تطبيق الدالة
foo
على عمود
"names"
.
(
str.upper
المثال حيث تكون
foo
str.upper
فقط لأغراض التوضيح ، لكن سؤالي يتعلق بأي وظيفة صالحة يمكن تطبيقها على عناصر قابلة للتكرار.)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
TypeError: العمود غير قابل للتكرار
يمكنني القيام بذلك باستخدام
udf
:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
في هذا المثال بالذات ، يمكنني تجنب
udf
عن طريق انفجار العمود ، والاتصال بـ
pyspark.sql.functions.upper()
، ثم
groupBy
و
collect_list
:
df.select('type', f.explode('names').alias('name'))\
.withColumn('name', f.upper(f.col('name')))\
.groupBy('type')\
.agg(f.collect_list('name').alias('names'))\
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
ولكن هذا كثير من التعليمات البرمجية للقيام بشيء بسيط.
هل هناك طريقة أكثر مباشرة للتكرار على عناصر
ArrayType()
باستخدام دالات شرارة dataframe؟
في Spark <2.4 ، يمكنك استخدام دالة معرفة من قبل المستخدم:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@udf(ArrayType(t))
def _(xs):
if xs is not None:
return [f(x) for x in xs]
return _
foo_udf = transform(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
بالنظر إلى التكلفة العالية
collect_list
explode
+
collect_list
، يُفضل هذا النهج على وجه الحصر تقريبًا ، على الرغم من تكلفته الأساسية.
في
Spark 2.4
أو الأحدث ، يمكنك استخدام
transform
* مع
upper
(انظر
SPARK-23909
):
from pyspark.sql.functions import expr
df.withColumn(
'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
من الممكن أيضًا استخدام
pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
def _(xs):
return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
على الرغم من أن أحدث مجموعات Arrow / PySpark تدعم التعامل مع أعمدة
ArrayType
(
SPARK-24259
،
SPARK-21187
).
ومع ذلك ، يجب أن يكون هذا الخيار أكثر كفاءة من UDF القياسي (خاصةً مع انخفاض النفقات العامة) مع دعم وظائف Python التعسفية.
*
يتم أيضًا دعم عدد من وظائف الترتيب العالي الأخرى
، بما في ذلك ، على سبيل المثال لا الحصر ،
filter
aggregate
.
انظر على سبيل المثال
- الاستعلام عن شرارة SQL DataFrame مع أنواع معقدة
- كيفية شريحة ومجموع عناصر عمود الصفيف؟
- تصفية محتوى عمود الصفيف
- شرارة سكالا المتوسط الحكيم من خلال التعامل مع فارغة .
- كيفية استخدام تحويل وظيفة ترتيب أعلى؟ .
نعم يمكنك القيام بذلك عن طريق تحويله إلى RDD ثم العودة إلى DF.
>>> df.show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
>>> df.rdd.mapValues(lambda x: [y.upper() for y in x]).toDF(["type","names"]).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+