apache spark - تقسيم عمود سلسلة شرارة Dataframe إلى أعمدة متعددة




apache-spark pyspark (2)

لقد رأيت العديد من الأشخاص يقترحون أن Dataframe.explode هي طريقة مفيدة للقيام بذلك ، ولكنها تؤدي إلى صفوف أكثر من dataframe الأصلية ، وهو ما لا أريده على الإطلاق. أريد ببساطة أن أفعل ما يعادل Dataframe من بسيط للغاية:

rdd.map(lambda row: row + [row.my_str_col.split('-')])

الذي يأخذ شيئا يشبه:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

ويحولها إلى هذا:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

أنا على علم بـ pyspark.sql.functions.split() ، لكنه ينتج عمود صف متداخل بدلاً من عمودين من المستوى الأعلى مثلما أريد.

من الناحية المثالية ، أريد تسمية هذه الأعمدة الجديدة أيضًا.


فيما يلي حل للحالة العامة التي لا تتضمن الحاجة إلى معرفة طول الصفيف في وقت مبكر ، أو استخدام collect ، أو استخدام udf . لسوء الحظ ، يعمل هذا فقط مع الإصدار 2.1 وما posexplode spark ، لأنه يتطلب وظيفة posexplode .

افترض أن لديك DataFrame التالية:

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

قم posexplode عمود letters ثم استخدام posexplode لتفجير الصفيف الناتج مع الموضع في الصفيف. استخدم التالي pyspark.sql.functions.expr للاستيلاء على العنصر في مؤشر الفهرس في هذه المجموعة.

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

الآن نقوم بإنشاء عمودين جديدين من هذه النتيجة. الأول هو اسم العمود الجديد ، والذي سيكون عبارة عن سلسلة من letter والفهرس في الصفيف. سيكون العمود الثاني القيمة في الفهرس المقابل في الصفيف. نحصل على هذا الأخير من خلال استغلال وظائف pyspark.sql.functions.expr والذي يسمح لنا باستخدام قيم الأعمدة كمعلمات .

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

الآن يمكننا groupBy num pivot DataFrame. بوضع كل ذلك معا ، نحصل على:

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+

لقد وجدت حلاً للحالة غير المستوية العامة (أو عندما تحصل على الأعمدة المتداخلة ، التي تم الحصول عليها باستخدام دالة .split ()):

import pyspark.sql.functions as f

@f.udf(StructType([StructField(col_3, StringType(), True),
                   StructField(col_4, StringType(), True)]))

 def splitCols(array):
    return array[0],  ''.join(array[1:len(array)])

 df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
        .select(df.columns+['name.*'])

بشكل أساسي ، تحتاج فقط إلى تحديد جميع الأعمدة السابقة + العمود المتداخل 'column_name. *' وستحصل عليها كعمودين من المستوى الأعلى في هذه الحالة.





pyspark-sql