python - إعادة تسمية الحقل المتداخل في شرارة dataframe




apache-spark pyspark (2)

وجود dataframe df في سبارك:

 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

كيفية إعادة تسمية الحقل array_field.a إلى array_field.a_renamed ؟

[تحديث]:

.withColumnRenamed() لا يعمل مع الحقول المتداخلة لذلك جربت هذه الطريقة المتسللة وغير الآمنة:

# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'

ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'

# Then set dataframe's schema with altered schema
df._schema = schema

أعلم أن تعيين سمة خاصة ليس ممارسة جيدة ، لكنني لا أعرف طريقة أخرى لتعيين مخطط df

أعتقد أنني على المسار الصحيح ولكن df.printSchema() لا يزال يعرض الاسم القديم لـ array_field.a ، على الرغم من أن df.schema == schema True


يمكنك الاسترداد عبر مخطط إطار البيانات لإنشاء مخطط جديد بالتغييرات المطلوبة.

المخطط في PySpark هو StructType الذي يحتوي على قائمة StructFields ويمكن لكل StructField الاحتفاظ بنوع أولي أو StructType آخر.

هذا يعني أنه يمكننا تحديد ما إذا كنا نريد إعادة التدوير استنادًا إلى ما إذا كان النوع هو نوع بنية أم لا.

يوجد أدناه نموذج تنفيذي توضيحي يوضح لك كيف يمكنك تنفيذ الفكرة أعلاه.

 # Some imports from pyspark.sql import * from copy import copy # We take a dataframe and return a new one with required changes def cleanDataFrame(df: DataFrame) -> DataFrame: # Returns a new sanitized field name (this function can be anything really) def sanitizeFieldName(s: str) -> str: return s.replace("-", "_").replace("&", "_").replace("\"", "_")\ .replace("[", "_").replace("]", "_").replace(".", "_") # We call this on all fields to create a copy and to perform any changes we might # want to do to the field. def sanitizeField(field: StructField) -> StructField: field = copy(field) field.name = sanitizeFieldName(field.name) # We recursively call cleanSchema on all types field.dataType = cleanSchema(field.dataType) return field def cleanSchema(dataType: [DataType]) -> [DateType]: dataType = copy(dataType) # If the type is a StructType we need to recurse otherwise we can return since # we've reached the leaf node if isinstance(dataType, StructType): # We call our sanitizer for all top level fields dataType.fields = [sanitizeField(f) for f in dataType.fields] elif isinstance(dataType, ArrayType): dataType.elementType = cleanSchema(dataType.elementType) return dataType # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data return spark.createDataFrame(df.rdd, cleanSchema(df.schema)) 

الثعبان

لا يمكن تعديل حقل متداخل واحد. لديك لإعادة إنشاء هيكل كامل. في هذه الحالة بالذات ، الحل الأبسط هو استخدام cast .

أولاً حفنة من الواردات:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
    ArrayType, LongType, StringType, StructField, StructType)

ومثال البيانات:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

دعنا نؤكد أن المخطط هو نفسه كما في حالتك:

df.printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

يمكنك تحديد مخطط جديد على سبيل المثال كسلسلة:

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

أو نوع DataType :

struct_schema = ArrayType(StructType([
    StructField("a_renamed", StringType()),
    StructField("b", LongType()),
    StructField("c", LongType())
]))

 df.select(col("array_field").cast(struct_schema)).printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

سكالا

يمكن استخدام نفس التقنيات في Scala:

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

أو

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
    StructField("a_renamed", StringType),
    StructField("b", LongType),
    StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

التحسينات الممكنة :

إذا كنت تستخدم معالجة بيانات معبرة أو مكتبة معالجة JSON ، فقد يكون من الأسهل تفريغ أنواع البيانات dict أو سلسلة JSON ونقلها من هناك على سبيل المثال (Python / toolz ):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
    keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
   #  Get schema of the field as a dict
   df.schema["array_field"].jsonValue(),
   # Update fields with rename
   update_in(
       keys=["type", "elementType", "fields"],
       func=lambda x: pipe(x, map(rename_field), list)),
   # Load schema from dict
   StructField.fromJson,
   # Get data type
   attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()




rename