apache spark - تقسيم عمود سلسلة شرارة Dataframe إلى أعمدة متعددة
apache-spark pyspark (2)
لقد رأيت العديد من الأشخاص يقترحون أن
Dataframe.explode
هي طريقة مفيدة للقيام بذلك ، ولكنها تؤدي إلى صفوف أكثر من dataframe الأصلية ، وهو ما لا أريده على الإطلاق.
أريد ببساطة أن أفعل ما يعادل Dataframe من بسيط للغاية:
rdd.map(lambda row: row + [row.my_str_col.split('-')])
الذي يأخذ شيئا يشبه:
col1 | my_str_col
-----+-----------
18 | 856-yygrm
201 | 777-psgdg
ويحولها إلى هذا:
col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
أنا على علم بـ
pyspark.sql.functions.split()
، لكنه ينتج عمود صف متداخل بدلاً من عمودين من المستوى الأعلى مثلما أريد.
من الناحية المثالية ، أريد تسمية هذه الأعمدة الجديدة أيضًا.
فيما يلي حل للحالة العامة التي لا تتضمن الحاجة إلى معرفة طول الصفيف في وقت مبكر ، أو استخدام
collect
، أو استخدام
udf
.
لسوء الحظ ، يعمل هذا فقط مع الإصدار 2.1 وما
posexplode
spark
، لأنه يتطلب وظيفة
posexplode
.
افترض أن لديك DataFrame التالية:
df = spark.createDataFrame(
[
[1, 'A, B, C, D'],
[2, 'E, F, G'],
[3, 'H, I'],
[4, 'J']
]
, ["num", "letters"]
)
df.show()
#+---+----------+
#|num| letters|
#+---+----------+
#| 1|A, B, C, D|
#| 2| E, F, G|
#| 3| H, I|
#| 4| J|
#+---+----------+
قم
posexplode
عمود
letters
ثم استخدام
posexplode
لتفجير الصفيف الناتج مع الموضع في الصفيف.
استخدم التالي
pyspark.sql.functions.expr
للاستيلاء على العنصر في مؤشر الفهرس في هذه المجموعة.
import pyspark.sql.functions as f
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.show()
#+---+------------+---+---+
#|num| letters|pos|val|
#+---+------------+---+---+
#| 1|[A, B, C, D]| 0| A|
#| 1|[A, B, C, D]| 1| B|
#| 1|[A, B, C, D]| 2| C|
#| 1|[A, B, C, D]| 3| D|
#| 2| [E, F, G]| 0| E|
#| 2| [E, F, G]| 1| F|
#| 2| [E, F, G]| 2| G|
#| 3| [H, I]| 0| H|
#| 3| [H, I]| 1| I|
#| 4| [J]| 0| J|
#+---+------------+---+---+
الآن نقوم بإنشاء عمودين جديدين من هذه النتيجة.
الأول هو اسم العمود الجديد ، والذي سيكون عبارة عن سلسلة من
letter
والفهرس في الصفيف.
سيكون العمود الثاني القيمة في الفهرس المقابل في الصفيف.
نحصل على هذا الأخير من خلال استغلال وظائف
pyspark.sql.functions.expr
والذي يسمح لنا
باستخدام قيم الأعمدة كمعلمات
.
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.show()
#+---+-------+---+
#|num| name|val|
#+---+-------+---+
#| 1|letter0| A|
#| 1|letter1| B|
#| 1|letter2| C|
#| 1|letter3| D|
#| 2|letter0| E|
#| 2|letter1| F|
#| 2|letter2| G|
#| 3|letter0| H|
#| 3|letter1| I|
#| 4|letter0| J|
#+---+-------+---+
الآن يمكننا
groupBy
num
pivot
DataFrame.
بوضع كل ذلك معا ، نحصل على:
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.groupBy("num").pivot("name").agg(f.first("val"))\
.show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#| 1| A| B| C| D|
#| 3| H| I| null| null|
#| 2| E| F| G| null|
#| 4| J| null| null| null|
#+---+-------+-------+-------+-------+
لقد وجدت حلاً للحالة غير المستوية العامة (أو عندما تحصل على الأعمدة المتداخلة ، التي تم الحصول عليها باستخدام دالة .split ()):
import pyspark.sql.functions as f
@f.udf(StructType([StructField(col_3, StringType(), True),
StructField(col_4, StringType(), True)]))
def splitCols(array):
return array[0], ''.join(array[1:len(array)])
df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
.select(df.columns+['name.*'])
بشكل أساسي ، تحتاج فقط إلى تحديد جميع الأعمدة السابقة + العمود المتداخل 'column_name. *' وستحصل عليها كعمودين من المستوى الأعلى في هذه الحالة.