scala - স্ট্রাকচার্ড স্ট্রিমিং ব্যবহার করে কাফকা থেকে জেএসএন ফর্ম্যাটে রেকর্ডগুলি কীভাবে পড়বেন?



apache-spark apache-kafka (1)

স্পার্ক দৃষ্টিকোণ থেকে value একটি বাইট সিকোয়েন্স। সিরিয়ালাইজেশন ফর্ম্যাট বা বিষয়বস্তু সম্পর্কে এর কোনও জ্ঞান নেই। দায়ের করা ফাইলটি উত্তোলনের জন্য আপনাকে প্রথমে এটি বিশ্লেষণ করতে হবে।

যদি ডেটাটি জেএসএন স্ট্রিং হিসাবে সিরিয়ালযুক্ত হয় তবে আপনার কাছে দুটি বিকল্প রয়েছে। আপনি StringType value দিতে StringType এবং from_json ব্যবহার করতে from_json এবং একটি স্কিমা সরবরাহ করতে পারেন:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

বা StringType cast , StringType ব্যবহার করে পাথ দ্বারা ক্ষেত্রগুলি আহরণ করুন:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

এবং পছন্দসই ধরণের পরে cast

আমি কাফকা থেকে ডেটা স্ট্রিম লোড করতে ডেটা ফ্রেম / ডেটাসেট এপিআই এর উপর ভিত্তি করে স্পার্ক-স্ট্রিমিং ব্যবহার করে কাঠামোগত স্ট্রিমিং পদ্ধতির ব্যবহার করার চেষ্টা করছি।

আমি ব্যবহার করি:

  • স্পার্ক 2.10
  • কাফকা 0.10
  • স্ফুলিঙ্গ-SQL-কাফকা-0-10

স্পার্ক কাফকা ডেটাসোর্স অন্তর্নিহিত স্কিমাটি সংজ্ঞায়িত করেছে:

|key|value|topic|partition|offset|timestamp|timestampType|

আমার ডেটা জেসন ফর্ম্যাটে আসে এবং সেগুলি মান কলামে সঞ্চয় করা হয় । আমি কীভাবে মান কলাম থেকে অন্তর্নিহিত স্কিমা এক্সট্রাক্ট করব এবং প্রাপ্ত সঞ্চিত ডেটাফ্রেমকে মূল্যবঞ্চিত কলামগুলিতে আপডেট করব? আমি নীচের পদ্ধতির চেষ্টা করেছিলাম কিন্তু এটি কার্যকর হয় না:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

এখানে আমি ব্যতিক্রম org.apache.spark.sql.AnalysisException: Can't extract value from value#337; কারণ স্ট্রিম তৈরির সময়, অভ্যন্তরের মানগুলি জানা যায় না ...

আপনার কি কোন পরামর্শ আছে?