scala - 分散処理 - spark メモリ 不足




Scala Sparkの分散マップ (2)

Sparkは分散Mapコレクションタイプをサポートしますか?

キーと値のペアであるHashMap [String、String]がある場合、これを分散Mapコレクション型に変換できますか? 要素にアクセスするために私は "フィルタ"を使用することができますが、私はこれが地図と同様に実行することを疑いますか?


簡単な答え:部分的に。

Map[A,B]を最初に一連の(k,v)ペアに強制することによってMap[A,B]RDD[(A,B)]変換できますが、そうすることでマップのキーはセット。 すなわち。 Map構造の意味を失います。

実用的な観点からは、 kvRdd.lookup(element)を使用して要素を対応する値に解決することはできますが、 kvRdd.lookup(element)のように単一のルックアップ値があるという保証はないため、結果はシーケンスになります。

物事を明確にするためのスパークシェルの例:

val englishNumbers = Map(1 -> "one", 2 ->"two" , 3 -> "three")
val englishNumbersRdd = sc.parallelize(englishNumbers.toSeq)

englishNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one) 

val spanishNumbers = Map(1 -> "uno", 2 -> "dos", 3 -> "tres")
val spanishNumbersRdd = sc.parallelize(spanishNumbers.toList)

val bilingueNumbersRdd = englishNumbersRdd union spanishNumbersRdd

bilingueNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one, uno)

私はいくつかの新しい情報を見つけたので、私は自分のコメントを答えに変えたいと思った。 @maasgはすでに標準のlookup関数をカバーしていlookup 。RDDのパーティショナーがNoneの場合、ルックアップはいずれにせよフィルタを使用するため注意してください。 火花の上にある(K、V)ストアに関しては、これが進行中であるように見えますが、使用可能なpull requestがhereなされhere 。 これが使用例です。

import org.apache.spark.rdd.IndexedRDD

// Create an RDD of key-value pairs with Long keys.
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing
// the entries.
val indexed = IndexedRDD(rdd).cache()

// Perform a point update.
val indexed2 = indexed.put(1234L, 10873).cache()
// Perform a point lookup. Note that the original IndexedRDD remains
// unmodified.
indexed2.get(1234L) // => Some(10873)
indexed.get(1234L) // => Some(0)

// Efficiently join derived IndexedRDD with original.
val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0)
indexed3.collect // => Array((1234L, 10873))

// Perform insertions and deletions.
val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache()
indexed2.get(-100L) // => None
indexed4.get(-100L) // => Some(111)
indexed2.get(999L) // => Some(0)
indexed4.get(999L) // => None

プルリクエストは順調に受信されており、おそらく将来のバージョンのsparkに含まれるように思われるので、そのプルリクエストを自分のコードで使用しても安全です。 好奇心旺盛だった場合のJIRAのチケットはこちらです





apache-spark