scala - বাফারচেমা কর্মক্ষমতা সম্পর্কিত সমস্যা হিসাবে ইউডিএফ স্পষ্ট করুন অ্যারে টাইপ দিয়ে



performance apache-spark (1)

টিএল; ডিআর হয় ইউডিএএফ ব্যবহার করবেন না বা অ্যারে টাইপের পরিবর্তে আদিম ধরণের ব্যবহার করবেন না।

UserDefinedFunction ছাড়া

উভয় সমাধানের অভ্যন্তরীণ এবং বাহ্যিক উপস্থাপনার মধ্যে ব্যয়বহুল জাগলিং এড়ানো উচিত।

মানক সমষ্টি এবং pivot ব্যবহার করে

এটি স্ট্যান্ডার্ড এসকিউএল সমষ্টি ব্যবহার করে। অভ্যন্তরীণভাবে অনুকূলিত হওয়ার সময় এটি কীগুলির সংখ্যা এবং অ্যারের আকার বাড়লে এটি ব্যয়বহুল হতে পারে।

প্রদত্ত ইনপুট:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

আপনি পারেন:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

combineByKey / aggregateByKey সহ combineByKey এপিআই ব্যবহার aggregateByKey

পুরানো byKey সমষ্টিটি byKey বাফার সহ। কোনও ঘণ্টা এবং হুইসেল নেই তবে বিস্তৃত ইনপুটগুলির সাথে যুক্তিসঙ্গতভাবে ভাল অভিনয় করা উচিত। আপনি যদি ইনপুটটিকে অপ্রয়োজনীয় বলে সন্দেহ করেন তবে আপনি পরিবর্তিত Map মতো আরও দক্ষ মধ্যবর্তী প্রতিনিধিত্ব বিবেচনা করতে পারেন।

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

আদিম ধরণের সাথে ব্যবহারকারীর UserDefinedFunction ব্যবহার করা

আমি যতদূর অভ্যন্তরীণগুলি বুঝতে পারি, পারফরম্যান্সের বাধা হ'ল ArrayConverter.toCatalystImpl . ArrayConverter.toCatalystImpl

দেখে মনে হচ্ছে এটি প্রতিটি কলকে MutableAggregationBuffer.update জন্য ডাকা হয় এবং GenericArrayData প্রতিটি Row জন্য নতুন GenericArrayData বরাদ্দ করে।

আমরা যদি bufferSchema হিসাবে নতুনভাবে সংজ্ঞায়িত করি:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}

update এবং merge উভয়ই বাফারে আদিম মানগুলির সরল প্রতিস্থাপন হিসাবে প্রকাশ করা যেতে পারে। কল চেইন বেশ দীর্ঘ থাকবে, তবে এর জন্য অনুলিপি / রূপান্তর এবং ক্রেজি বরাদ্দ লাগবে না null চেক ছাড়াই আপনার অনুরূপ কিছু দরকার

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))

এবং

for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}

যথাক্রমে।

পরিশেষে evaluate উচিত Row নেওয়া এবং এটিকে আউটপুট Seq রূপান্তর করা উচিত:

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

দয়া করে মনে রাখবেন যে এই বাস্তবায়নে একটি সম্ভাব্য বাধা merge । যদিও এটি এম বালতিগুলির সাথে কোনও নতুন কার্যকারিতা সংক্রান্ত সমস্যা প্রকাশ না করা উচিত, merge প্রতিটি merge হ'ল (এম)

কে অনন্য কী এবং পি পার্টিশনের সাহায্যে একে এম * কে বলা হবে সবচেয়ে খারাপ পরিস্থিতিতে, যেখানে প্রতিটি কী প্রতিটি পার্টিশনে কমপক্ষে একবার উপস্থিত হয়। এটি কার্যকরভাবে ও (এম * এন * কে) merge উপাদানটির জটিলতা বৃদ্ধি করে।

সাধারণভাবে আপনি এটি সম্পর্কে অনেক কিছু করতে পারেন না। তবে আপনি যদি ডেটা বিতরণ সম্পর্কে সুনির্দিষ্ট অনুমানগুলি করেন (ডেটা বিরল, মূল বিতরণ অভিন্ন) তবে আপনি কিছুটা শর্টকাট করতে পারেন এবং প্রথমে পরিবর্তন করতে পারেন:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

অনুমানগুলি সন্তুষ্ট হলে এটি করা উচিত:

  • বিপরীতমুখী ঘন অ্যারে-জাতীয় Rows পরিবর্তে বিচ্ছিন্ন জোড়গুলি বদলে শফলের আকার হ্রাস করুন।
  • কেবলমাত্র আপডেটগুলি (প্রতিটি ও (1) ) ব্যবহার করে সমষ্টিগত ডেটা সম্ভবত কেবল সূচকগুলির উপসেট হিসাবে স্পর্শ করে।

তবে যদি একটি বা উভয় অনুমান সন্তুষ্ট না হয় তবে আপনি আশা করতে পারেন যে বদলের আকার বাড়বে এবং আপডেটের সংখ্যা একই থাকবে। একই সাথে ডেটা স্কিউগুলি জিনিসগুলি update - shuffle - merge দৃশ্যের তুলনায় আরও খারাপ করতে পারে।

" Dataset " Dataset " টাইপ করা Dataset সহ Dataset :

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}

যা নীচে প্রদর্শিত হিসাবে ব্যবহার করা যেতে পারে

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+

আমি একটি ইউডিএএফ নিয়ে কাজ করছি যা উপাদানগুলির একটি অ্যারে ফেরত দেয়।

প্রতিটি আপডেটের জন্য ইনপুট হ'ল সূচক এবং মানের একটি দ্বিগুণ।

ইউডিএএফ যা করে তা হ'ল একই সূচীর অধীনে সমস্ত মান সংযুক্ত করা।

উদাহরণ:

ইনপুট (সূচক, মান): (2,1), (3,1), (2,3)

ফিরে আসতে হবে (0,0,4,1, ..., 0)

যুক্তিটি সূক্ষ্মভাবে কাজ করে, তবে আপডেট পদ্ধতিটি নিয়ে আমার একটি সমস্যা রয়েছে, আমার প্রয়োগটি প্রতিটি সারির জন্য কেবলমাত্র 1 টি সেল আপডেট করে , তবে সেই পদ্ধতিতে শেষ নিয়োগটি পুরো অ্যারেটি অনুলিপি করে - যা অপ্রয়োজনীয় এবং অত্যন্ত সময়সাপেক্ষ consum

এই প্রশ্নের একা আমার ক্যোয়ারি এক্সিকিউশন সময় 98% এর জন্য দায়ী।

আমার প্রশ্ন, আমি কীভাবে সেই সময়টি হ্রাস করতে পারি? পুরো বাফারটি প্রতিস্থাপন না করেই কি বাফারের অ্যারেতে 1 মান নির্ধারণ করা সম্ভব?

পিএস: আমি স্পার্ক ১.6 এর সাথে কাজ করছি এবং আমি শীঘ্রই এটিকে যে কোনও সময় আপগ্রেড করতে পারি না, সুতরাং দয়া করে এই সংস্করণে কাজ করবে এমন সমাধানে লেগে থাকুন।

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}




user-defined-functions