software - python tutorial




আমি কীভাবে একটি অ্যারে(অর্থাত্ তালিক্য) কলামটি ভেক্টরে রূপান্তর করব (2)

প্রশ্নের সংক্ষিপ্ত সংস্করণ!

নিম্নলিখিত স্নিপেটটি বিবেচনা করুন (ধরে নেওয়া হচ্ছে spark ইতিমধ্যে কিছু SparkSession সেট করা আছে):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

লক্ষ্য করুন যে তাপমাত্রা ক্ষেত্রটি ভাসমানগুলির একটি তালিকা। আমি ফ্লোটগুলির এই তালিকাগুলিকে এমএলিব টাইপ ভেক্টরে রূপান্তর করতে চাই এবং আমি চাই যে এই রূপান্তরটি DataFrame ব্যবহারের পরিবর্তে বেসিক DataFrame এপিআই ব্যবহার করে প্রকাশ করা উচিত (যা অক্ষম কারণ এটি জেভিএম থেকে পাইথনে সমস্ত ডেটা প্রেরণ করে, পাইথনটিতে প্রক্রিয়াজাতকরণটি সম্পন্ন হয়, আমরা স্পার্কের ক্যাটালিস্ট অপ্টিমাইজার, ইয়াদ ইয়াদ) এর সুবিধা পাই না। আমি এটা কিভাবে করবো? বিশেষ করে:

  1. একটি সরাসরি কাস্ট কাজ করার উপায় আছে? বিশদ জন্য নীচে দেখুন (এবং একটি workaround এ একটি ব্যর্থ প্রচেষ্টা)? বা, এর পরে আর কোনও অপারেশন রয়েছে যার প্রভাব আমার পরে ছিল?
  2. আমি নীচে প্রস্তাবিত দুটি বিকল্প সমাধানগুলির মধ্যে কোনটি আরও দক্ষ? (তালিকার আইটেমগুলি বিস্ফোরিত / পুনরায় সমাবেশ) বা অন্য কোনও প্রায়-তবে-সঠিক-সঠিক বিকল্প নেই যেগুলির উভয়ের চেয়ে ভাল?

একটি সোজা castালাই কাজ করে না

এটিই আমি "সঠিক" সমাধান হিসাবে প্রত্যাশা করব। আমি কলামের ধরণটি এক প্রকার থেকে অন্য প্রকারে রূপান্তর করতে চাই, সুতরাং আমার একটি কাস্ট ব্যবহার করা উচিত। কিছুটা প্রসঙ্গ হিসাবে, আমি এটিকে অন্য ধরণের কাস্ট করার স্বাভাবিক উপায়টি মনে করিয়ে দিচ্ছি:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

এখন যেমন df_with_strings.collect()[0]["temperatures"][1] হ'ল '-7.0' । তবে যদি আমি কোনও মিলি ভেক্টরকে কাস্ট করি তবে জিনিসগুলি এত ভাল হয় না:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

এটি একটি ত্রুটি দেয়:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

বাবা! এটা ঠিক করার কোন ধারনা তোমার আছ?

সম্ভাব্য বিকল্প

বিকল্প 1: VectorAssembler ব্যবহার করে

একটি Transformer যা এই কাজের জন্য প্রায় আদর্শ বলে মনে হচ্ছে: VectorAssembler । এটি এক বা একাধিক কলাম নেয় এবং এগুলিকে একটি একক ভেক্টরের সাথে যুক্ত করে তোলে। দুর্ভাগ্যক্রমে এটি কেবল Vector এবং Float কলামগুলি নেয়, Array কলামগুলি নয়, সুতরাং Array কাজ করে না:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

এটি এই ত্রুটি দেয়:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

আমি ভাবতে পারি সবচেয়ে ভাল কাজ হ'ল তালিকাটি একাধিক কলামে বিস্ফোরিত করা এবং তারপরে সেগুলি আবার ব্যাক আপ করার জন্য VectorAssembler ব্যবহার করুন:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

এটি দেখে মনে হচ্ছে এটি আদর্শ হবে, কেবল TEMPERATURE_COUNT টি 100 এরও বেশি এবং কখনও কখনও 1000 এরও বেশি ((অন্য সমস্যাটি হ'ল কোডটি আরও জটিল হবে যদি আপনি আগে থেকে অ্যারের আকার না জানেন তবে এটি এমন কি আমার ডেটা-র ক্ষেত্রে নয়।) স্পার্ক আসলে সেই বহু কলামের সাহায্যে একটি মধ্যবর্তী ডেটা উত্পন্ন করে বা পৃথক আইটেমগুলি ক্ষণস্থায়ীভাবে পাস করার মধ্যবর্তী পদক্ষেপটি বিবেচনা করে (বা সত্যই এটি পুরোপুরি এই পদক্ষেপটিকে অপ্টিমাইজ করে যখন এটি দেখেন যে এই কলামগুলির একমাত্র ব্যবহারটি ভেক্টরে একত্রিত হতে হবে)?

বিকল্প 2: একটি ইউডিএফ ব্যবহার করুন

পরিবর্তনের জন্য একটি সহজ বিকল্প হ'ল ইউডিএফ ব্যবহার করা। এটি আমাকে কোডের এক লাইনে কী করতে চাই তা প্রত্যক্ষভাবে প্রকাশ করতে দেয় এবং ক্রেজি সংখ্যক কলাম সহ একটি ডেটা সেট করার প্রয়োজন হয় না। তবে পাইথন এবং জেভিএম-এর মধ্যে সমস্ত ডেটা আদান প্রদান করতে হয় এবং প্রতিটি পৃথক নম্বর পাইথন দ্বারা পরিচালনা করতে হয় (যা পৃথক ডেটা আইটেমগুলিতে পুনরাবৃত্তি করার জন্য কুখ্যাতভাবে ধীর হয়)। এখানে এটি কেমন দেখাচ্ছে:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

অবজ্ঞাপূর্ণ মন্তব্য

এই কাঁপানো প্রশ্নের বাকী অংশগুলি উত্তর খুঁজে পাওয়ার চেষ্টা করার সময় আমি কিছু অতিরিক্ত জিনিস নিয়ে এসেছি। এটি সম্ভবত বেশিরভাগ লোকেরা পড়ে এড়িয়ে যেতে পারেন।

কোনও সমাধান নয়: শুরুতে Vector ব্যবহার করুন

এই তুচ্ছ উদাহরণে ভেক্টর প্রকারটি দিয়ে শুরু করে ডেটা তৈরি করা সম্ভব, তবে অবশ্যই আমার ডেটা আসলে পাইথন তালিকাগুলি নয় যা আমি সমান্তরাল করছি, বরং এর পরিবর্তে কোনও ডেটা উত্স থেকে পড়া হচ্ছে। তবে রেকর্ডটির জন্য, এখানে এটি দেখতে কেমন হবে:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

অপর্যাপ্ত সমাধান: map() ব্যবহার করুন map()

একটি সম্ভাবনা হ'ল তালিকাটি কোনও Vector রূপান্তর করতে আরডিডি map() পদ্ধতিটি ব্যবহার করা। এটি ইউডিএফ ধারণার অনুরূপ, এটির চেয়েও খারাপ কারণ সিরিয়ালাইজেশন ইত্যাদির ব্যয় প্রতিটি সারির সমস্ত ক্ষেত্রের জন্য ব্যয় করা হয়, কেবল চালিত হওয়াতে নয়। রেকর্ডের জন্য, সমাধানটি দেখতে কেমন হবে তা এখানে:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Castালাইয়ের জন্য কোনও কাজের ব্যর্থ প্রচেষ্টা

হতাশায়, আমি লক্ষ্য করেছি যে Vector চারটি ক্ষেত্র সহ একটি কাঠামোর দ্বারা অভ্যন্তরীণভাবে উপস্থাপিত হয়েছে, তবে সেই ধরণের স্ট্রাক্ট থেকে traditionalতিহ্যবাহী কাস্ট ব্যবহার করা কোনওভাবেই কাজ করে না। এখানে একটি চিত্র তুলে ধরা হয়েছে (যেখানে আমি ইউডিএফ ব্যবহার করে কাঠামোটি তৈরি করেছি তবে ইউডিএফ গুরুত্বপূর্ণ অংশ নয়):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

এটি ত্রুটি দেয়:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"

আপনার মত আমারও একই সমস্যা ছিল এবং আমি এইভাবে করেছি। এই উপায়টিতে আরডিডি রূপান্তর অন্তর্ভুক্ত রয়েছে, সুতরাং এটি সম্পাদন সমালোচনা নয়, তবে এটি কার্যকর করে।

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

ফলাফল হলো,

DataFrame[city: string, temperatures: vector]

ব্যক্তিগতভাবে আমি পাইথন ইউডিএফের সাথে যেতাম এবং অন্য কিছু নিয়ে বিরক্ত করতাম না:

  • Vectors স্থানীয় এসকিউএল প্রকার নয় তাই একরকম বা অন্য কোনও উপায়ে ওভারহেডের কার্যকারিতা থাকবে। বিশেষত এই প্রক্রিয়াটির জন্য দুটি পদক্ষেপের প্রয়োজন যেখানে ডেটা প্রথমে বাহ্যিক প্রকার থেকে সারিতে রূপান্তরিত হয় এবং তারপরে সারি থেকে জেনেরিক RowEncoder ব্যবহার করে অভ্যন্তরীণ উপস্থাপনায় RowEncoder
  • যে কোনও ডাউন স্ট্রিম এমএল Pipeline একটি সাধারণ রূপান্তর চেয়ে অনেক বেশি ব্যয়বহুল হবে। তবুও এটির জন্য একটি প্রক্রিয়া প্রয়োজন যা উপরের বর্ণিত পদ্ধতির বিপরীতে

তবে আপনি যদি এখানে অন্য বিকল্পগুলি চান তবে আপনি হলেন:

  • পাইথন র‍্যাপার সহ স্কালা ইউডিএফ:

    প্রকল্পের সাইটের নির্দেশাবলী অনুসরণ করে sbt ইনস্টল করুন।

    নিম্নলিখিত কাঠামো সহ স্কালা প্যাকেজ তৈরি করুন:

    .
    ├── build.sbt
    └── udfs.scala

    build.sbt সম্পাদনা করুন (স্কালা এবং স্পার্ক সংস্করণ প্রতিফলিত করতে সমন্বয় করুন):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.spark" %% "spark-mllib" % "2.1.0"
    )

    udfs.scala সম্পাদনা করুন:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }

    প্যাকেজ:

    sbt package

    এবং অন্তর্ভুক্ত (বা স্ক্যালার উপর নির্ভর করে সমতুল্য:

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

    শেল / অ্যাপ্লিকেশন জমা দেওয়ার সময় --driver-class-path আর্গুমেন্ট হিসাবে।

    পাইস্পার্কে একটি মোড়ক সংজ্ঞায়িত করুন:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

    টেস্ট:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
  • DenseVector স্কিমা প্রতিফলিত করে একটি JSON ফর্ম্যাটে ডেটা DenseVector এবং এটি আবার পড়ুন:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    with_parsed_vector.printSchema()
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)






apache-spark-ml