呼び出し - ScalaとPythonのSparkパフォーマンス




scala python 呼び出し (2)

私はScalaよりもPythonが好きです。 しかし、SparkはScalaでネイティブに記述されているため、明らかな理由から、PythonバージョンよりもScalaでのコードの実行速度が速くなると期待していました。

その前提で、私はいくつかの1 GBのデータ用の非常に一般的な前処理コードのScalaバージョンを学び、書くことを考えました。 データは、KaggleのSpringLeafコンテストから Kaggle ます。 データの概要を示すためだけです(1936のディメンションと145232の行が含まれています)。 データは、int、float、string、booleanなどのさまざまなタイプで構成されます。 Spark処理には8コアのうち6コアを使用しています。 それが、すべてのコアが処理するために minPartitions=6 を使用した理由です。

Scalaコード

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Pythonコード

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scalaパフォーマンス ステージ0(38分)、ステージ1(18秒)

Pythonパフォーマンス ステージ0(11分)、ステージ1(7秒)

両方とも異なるDAG視覚化グラフを生成します(どちらの写真もScala( map )とPython( reduceByKey )で異なるステージ0関数を示しているため)

ただし、本質的に両方のコードは、データを(dimension_id、値リストの文字列)RDDに変換し、ディスクに保存しようとします。 出力は、各ディメンションのさまざまな統計を計算するために使用されます。

パフォーマンスの面では、このような実際のデータのScalaコードは、Pythonバージョンの 4倍の速度 で実行されるようです。 私にとって良いニュースは、Pythonを使い続ける良いモチベーションを与えてくれたことです。 悪いニュースは、私がその理由をよく理解していなかったことですか?


コードについて説明した元の回答は以下にあります。

まず第一に、それぞれが独自のパフォーマンスを考慮した異なるタイプのAPIを区別する必要があります。

RDD API

(JVMベースのオーケストレーションを備えた純粋なPython構造)

これは、PythonコードのパフォーマンスとPySpark実装の詳細によって最も影響を受けるコンポーネントです。 Pythonのパフォーマンスが問題になることはまずありませんが、考慮すべき要素が少なくともいくつかあります。

  • JVM通信のオーバーヘッド。 実際には、Pythonエグゼキューターに出入りするすべてのデータは、ソケットとJVMワーカーを介して渡される必要があります。 これは比較的効率的なローカル通信ですが、まだ無料ではありません。
  • プロセスベースのエグゼキューター(Python)対スレッドベース(シングルJVMマルチスレッド)エグゼキューター(Scala)。 各Pythonエグゼキューターは、独自のプロセスで実行されます。 副作用として、JVMの対応物より強力な分離を提供し、エグゼキュータのライフサイクルをある程度制御しますが、メモリ使用量が大幅に増加する可能性があります。

    • インタプリタのメモリフットプリント
    • ロードされたライブラリのフットプリント
    • 効率の悪いブロードキャスト(各プロセスにはブロードキャストの独自のコピーが必要です)
  • Pythonコード自体のパフォーマンス。 一般的に、ScalaはPythonよりも高速ですが、タスクごとに異なります。 さらに、 Numba ようなJIT、C拡張( Cython )、または Theano ような特殊なライブラリを含む複数のオプションがあります。 最後に、 ML / MLlib(または単にNumPyスタック)を使用しない場合は PyPy を代替インタープリターとして使用することを検討してください。 SPARK-3094 参照してください。

  • PySpark設定には、各タスクのPythonプロセスをフォークするか、既存のプロセスを再利用するかを選択するために使用できる spark.python.worker.reuse オプションが用意されています。 後者のオプションは、高価なガベージコレクションを回避するのに役立つようです(体系的なテストの結果よりも印象的です)が、前者(デフォルト)は、高価なブロードキャストとインポートの場合に最適です。
  • CPythonの最初の行のガベージコレクション方法として使用される参照カウントは、一般的なSparkワークロード(ストリームのような処理、参照サイクルなし)で非常によく機能し、長いGC一時停止のリスクを減らします。

MLlib

(PythonとJVMの混合実行)

基本的な考慮事項は以前とほとんど同じですが、いくつかの追加の問題があります。 MLlibで使用される基本構造は単純なPython RDDオブジェクトですが、すべてのアルゴリズムはScalaを使用して直接実行されます。

これは、PythonオブジェクトをScalaオブジェクトに変換するための追加コスト、およびその逆、メモリ使用量の増加、および後で説明する追加の制限を意味します。

現在(Spark 2.x)、RDDベースのAPIはメンテナンスモードであり 、Spark 3.0で削除される予定 です。

DataFrame APIおよびSpark ML

(ドライバーに限定されたPythonコードを使用したJVM実行)

これらはおそらく、標準のデータ処理タスクに最適です。 Pythonコードはドライバーでの高レベルの論理操作にほとんど制限されているため、PythonとScalaのパフォーマンスに違いはありません。

単一の例外は、行単位のPython UDFの使用であり、Scalaの同等のものよりも大幅に効率が低下します。 改善の余地はありますが(Spark 2.0.0で大幅な開発が行われています)、最大の制限は内部表現(JVM)とPythonインタープリター間の完全な往復です。 可能であれば、組み込み式の構成を優先する必要があります( example 。PythonUDFの動作はSpark 2.0.0で改善されましたが、ネイティブ実行と比較すると依然として最適ではありません。今後、 ベクトル化UDFの 導入により改善される可能性があります (SPARK-21190)

また、 DataFramesRDDs 間で不要なデータの DataFrames を行わないようにしてください。 これには、Pythonインタープリターとの間のデータ転送は言うまでもなく、高価なシリアル化と逆シリアル化が必要です。

Py4J呼び出しの待ち時間がかなり長いことに注意してください。 これには、次のような単純な呼び出しが含まれます。

from pyspark.sql.functions import col

col("foo")

通常、それは問題ではありません(オーバーヘッドは一定で、データの量に依存しません)が、ソフトリアルタイムアプリケーションの場合は、Javaラッパーのキャッシュ/再利用を検討できます。

GraphXおよびSparkデータセット

現在(Spark 1.6 2.1)に関しては、どちらもPySpark APIを提供していないため、PySparkはScalaよりもはるかに悪いと言えます。

GraphX

実際には、GraphXの開発はほぼ完全に停止し、現在プロジェクトはメンテナンスモードにあり、 関連するJIRAチケット は修正 され ないため クローズされて います GraphFrames ライブラリは、Pythonバインディングを備えた代替のグラフ処理ライブラリを提供します。

データセット

主観的に言えば、Pythonで静的に型指定された Datasets 場所はあまりなく、現在のScala実装が単純すぎて、 DataFrame と同じパフォーマンスの利点を提供していません。

ストリーミング

これまで見てきたことから、PythonではなくScalaを使用することを強くお勧めします。 PySparkが構造化ストリームのサポートを取得した場合、将来変更される可能性がありますが、現在Scala APIははるかに堅牢で包括的かつ効率的です。 私の経験はかなり限られています。

Spark 2.xの構造化ストリーミングは、言語間のギャップを減らすように見えますが、現時点ではまだ初期段階です。 それにもかかわらず、RDDベースのAPIは、 Databricksドキュメント (アクセス日2017-03-03)で既に「レガシーストリーミング」として参照されているため、さらなる統合の取り組みを期待するのが妥当です。

非パフォーマンスの考慮事項

機能パリティ

すべてのSpark機能がPySpark APIを通じて公開されるわけではありません。 必要な部品が既に実装されているかどうかを確認し、考えられる制限を理解してください。

MLlibと同様の混合コンテキストを使用する場合は特に重要です( タスクからのJava / Scala関数の呼び出しを 参照)。 公平を期すために、mllib.linalgのような mllib.linalg APIの一部は、Scalaよりも包括的なメソッドのセットを提供します。

API設計

PySpark APIは、Scalaの同等物を密接に反映しているため、厳密にはPythonicではありません。 これは、言語間のマッピングが非常に簡単であることを意味しますが、同時に、Pythonコードは非常に理解しにくいことがあります。

複雑なアーキテクチャ

PySparkのデータフローは、純粋なJVMの実行に比べて比較的複雑です。 PySparkプログラムやデバッグについて推論することははるかに困難です。 さらに、少なくともScalaとJVMの基本的な理解が必要です。

Spark 2.x以降

凍結されたRDD APIを使用した Dataset APIへの継続的な移行は、Pythonユーザーに機会と課題の両方をもたらします。 APIの高レベルの部分はPythonで公開する方がはるかに簡単ですが、より高度な機能を 直接 使用 する ことはほとんど不可能です。

さらに、ネイティブPython関数は、SQLの世界で2番目に重要な役割を果たし続けています。 Apache Arrowのシリアル化により将来これが改善されることを願っています( 現在の取り組みはデータ collection 対象としています が、UDF serdeは 長期的な目標です )。

Pythonコードベースに強く依存しているプロジェクトでは、純粋なPythonの代替( Dask Ray )が興味深い代替になる可能性があります。

どちらか一方である必要はありません

Spark DataFrame(SQL、Dataset)APIは、PySparkアプリケーションにScala / Javaコードを統合するためのエレガントな方法を提供します。 DataFrames を使用して、データをネイティブJVMコードに公開し、結果を読み戻すことができます。 他の場所で いくつかのオプションを説明しましたが、 Pyspark内でScalaクラスを使用する方法 でPython-Scala ラウンドトリップの 実用的な例を見つけることができます。

ユーザー定義型を導入することでさらに拡張できます( Spark SQLでカスタム型のスキーマを定義する方法を 参照してください)。

質問で提供されたコードの何が問題になっていますか

(免責事項:Pythonistaの観点。おそらくScalaのトリックを見逃してしまった)

まず、コードにはまったく意味をなさない部分が1つあります。 zipWithIndex を使用して (key, value) ペアを既に作成している zipWithIndex またはすぐに分割するために文字列を作成するポイントを enumerate ますか? flatMap は再帰的に flatMap しないため、タプルを生成し、次の map をスキップできます。

私が問題と reduceByKey 別の部分は reduceByKey です。 一般的に、 reduceByKey は、集計関数を適用してシャッフルする必要があるデータの量を削減できる場合に役立ちます。 単に文字列を連結するだけなので、ここでは何も得られません。 参照数などの低レベルのものを無視すると、転送する必要があるデータの量は groupByKey とまったく同じ groupByKey

通常、これについては詳しく語りませんが、Scalaコードのボトルネックであると言えます。 JVMでの文字列の結合は、かなり高価な操作です(たとえば 、「scalaでの文字列の連結はJavaでのコストと同じですか?」を 参照)。 これは、コード内の input4.reduceByKey(valsConcat) と同等の _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) ものは良い考えではないことを input4.reduceByKey(valsConcat) します。

groupByKey を避けたい場合は、 StringBuilder aggregateByKey を使用してみてください。 これに似た何かがトリックを行うはずです:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

しかし、私はそれがすべての大騒ぎの価値があるとは思わない。

上記を念頭に置いて、コードを次のように書き直しました。

スカラ

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

結果

local[6] モード(Intel(R)Xeon(R)CPU E3-1245 V2 @ 3.40GHz)で、エグゼキューターごとに4GBのメモリーが必要(n = 3):

  • Scala-平均:250.00秒、標準偏差:12.49
  • Python-平均:246.66秒、stdev:1.15

その時間のほとんどは、シャッフル、シリアライズ、デシリアライズ、およびその他の二次的なタスクに費やされていると確信しています。 楽しみのために、このマシンで1分以内に同じタスクを実行するPythonの素朴なシングルスレッドコードを次に示します。

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

上記の回答の拡張-

Scalaはpythonに比べて多くの点で高速ですが、pythonがscalaよりも人気を博している正当な理由がいくつかあります。

Python for Apache Sparkは、非常に簡単に習得して使用できます。 しかし、これがPysparkがScalaよりも良い選択である唯一の理由ではありません。 さらにあります。

SparkのPython APIはクラスター上では遅いかもしれませんが、最終的にはデータサイエンティストはScalaに比べてより多くのことができます。 Scalaの複雑さはありません。 インターフェイスはシンプルで包括的です。

コードの読みやすさ、保守、Apache SparkのPython APIの知識について話すことは、Scalaよりもはるかに優れています。

Pythonには、機械学習と自然言語処理に関連するいくつかのライブラリが付属しています。 これは、データ分析を支援し、非常に成熟したタイムテスト済みの統計も備えています。 たとえば、numpy、pandas、scikit-learn、seaborn、matplotlibなどです。

注:ほとんどのデータサイエンティストは、両方のAPIのベストを使用するハイブリッドアプローチを使用します。

最後に、Scalaコミュニティーは、プログラマーにとってあまり役に立たないことがよくあります。 これにより、Pythonは非常に貴重な学習になります。 Javaのような静的に型付けされたプログラミング言語の十分な経験があれば、Scalaをまったく使用しないことを心配するのをやめることができます。






rdd