scala - länge - title tag wordpress




Wie definiere ich die Partitionierung von DataFrame? (4)

Ich habe begonnen, Spark SQL und DataFrames in Spark 1.4.0 zu verwenden. Ich möchte in Scala einen benutzerdefinierten Partitionierer für DataFrames definieren, verstehe aber nicht, wie das geht.

Eine der Datentabellen, mit denen ich arbeite, enthält eine Liste von Transaktionen nach Konto, wie im folgenden Beispiel dargestellt.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Zumindest anfänglich werden die meisten Berechnungen zwischen den Transaktionen innerhalb eines Kontos durchgeführt. Ich möchte also, dass die Daten partitioniert werden, damit sich alle Transaktionen für ein Konto in derselben Spark-Partition befinden.

Aber ich sehe keinen Weg, dies zu definieren. Die DataFrame-Klasse verfügt über eine Methode mit dem Namen 'repartition (Int)', mit der Sie die Anzahl der zu erstellenden Partitionen angeben können. Es ist jedoch keine Methode verfügbar, um einen benutzerdefinierten Partitionierer für einen DataFrame zu definieren, wie er für eine RDD angegeben werden kann.

Die Quelldaten werden in Parkett gespeichert. Ich habe festgestellt, dass Sie beim Schreiben eines DataFrame in Parquet eine Spalte angeben können, nach der partitioniert werden soll. Vermutlich kann ich Parquet dann anweisen, die Daten nach der Spalte "Konto" zu partitionieren. Aber es könnte Millionen von Konten geben, und wenn ich Parquet richtig verstehe, würde es für jedes Konto ein eigenes Verzeichnis erstellen, sodass dies nicht nach einer vernünftigen Lösung klang.

Gibt es eine Möglichkeit, Spark zu veranlassen, diesen DataFrame so zu partitionieren, dass sich alle Daten für ein Konto in derselben Partition befinden?


Funke> = 2.3.0

SPARK-22614 legt die Bereichspartitionierung SPARK-22614 .

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 macht die Partitionierung externer Formate in der Datenquellen-API v2 SPARK-22389 .

Funke> = 1.6.0

In Spark> = 1.6 ist es möglich, die Partitionierung nach Spalten zum Abfragen und Zwischenspeichern zu verwenden. Siehe: SPARK-11410 und SPARK-4849 Verwendung der repartition :

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Im Gegensatz zu RDDs Spark Dataset (einschließlich Dataset[Row] aka DataFrame ) derzeit keinen benutzerdefinierten Partitionierer verwenden. In der Regel können Sie dies beheben, indem Sie eine künstliche Partitionierungssäule erstellen, die Ihnen jedoch nicht die gleiche Flexibilität bietet.

Funke <1.6.0:

Sie können Eingabedaten vorab partitionieren, bevor Sie einen DataFrame erstellen

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Da DataFrame Erstellung von DataFrame aus einer RDD nur eine einfache DataFrame erfordert, sollte das vorhandene Partitionslayout beibehalten werden *:

assert(df.rdd.partitions == partitioned.partitions)

So können Sie vorhandenen DataFrame partitionieren:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Es sieht also so aus, als wäre es nicht unmöglich. Die Frage bleibt, ob es überhaupt Sinn macht. Ich werde behaupten, dass dies meistens nicht der Fall ist:

  1. Repartitionierung ist ein teurer Prozess. In einem typischen Szenario müssen die meisten Daten serialisiert, gemischt und deserialisiert werden. Andererseits ist die Anzahl der Vorgänge, die von vorpartitionierten Daten profitieren können, relativ gering und weiter begrenzt, wenn die interne API nicht darauf ausgelegt ist, diese Eigenschaft zu nutzen.

    • Beitritt in einigen Szenarien, aber es würde eine interne Unterstützung erfordern,
    • Fensterfunktionen ruft mit passendem Partitioner auf. Wie oben, beschränkt auf eine einzelne Fensterdefinition. Es ist jedoch bereits intern partitioniert, sodass die Vor-Partitionierung redundant sein kann.
    • einfache Aggregationen mit GROUP BY - Es ist möglich, den Speicherbedarf der temporären Puffer ** zu reduzieren, aber die Gesamtkosten sind viel höher. Mehr oder weniger gleichwertig mit groupByKey.mapValues(_.reduce) (aktuelles Verhalten) vs. reduceByKey (Pre-Partitioning). In der Praxis wahrscheinlich nicht nützlich.
    • Datenkomprimierung mit SqlContext.cacheTable . Da OrderedRDDFunctions.repartitionAndSortWithinPartitions die Lauflängencodierung verwendet wird, kann durch Anwenden von OrderedRDDFunctions.repartitionAndSortWithinPartitions das Komprimierungsverhältnis verbessert werden.
  2. Die Leistung hängt stark von der Verteilung der Schlüssel ab. Wenn es schief ist, führt dies zu einer suboptimalen Ressourcennutzung. Im schlimmsten Fall ist es unmöglich, den Job überhaupt zu beenden.

  3. Bei der Verwendung einer deklarativen API auf hoher Ebene müssen Sie sich von den Implementierungsdetails auf niedriger Ebene isolieren. Wie bereits von @dwysakowicz und @RomiKuntsman eine Optimierung Aufgabe des Catalyst Optimizer . Es ist ein ziemlich raffiniertes Biest und ich bezweifle wirklich, dass Sie es leicht verbessern können, ohne viel tiefer in seine inneren Verhältnisse einzutauchen.

Verwandte konzepte

Partitionierung mit JDBC-Quellen :

JDBC-Datenquellen unterstützen predicates . Es kann wie folgt verwendet werden:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Es wird eine einzelne JDBC-Partition pro Prädikat erstellt. Beachten Sie, dass in der resultierenden Tabelle Duplikate angezeigt werden, wenn mit einzelnen Prädikaten erstellte Mengen nicht disjunkt sind.

partitionBy Methode in DataFrameWriter :

Spark DataFrameWriter bietet die Methode partitionBy, mit der Daten beim Schreiben "partitioniert" werden können. Es trennt die Daten beim Schreiben mithilfe der bereitgestellten Spalten

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Dies ermöglicht das Drücken der Vergleichselemente beim Lesen für Abfragen basierend auf dem Schlüssel:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

Es entspricht jedoch nicht DataFrame.repartition . Insbesondere Aggregationen wie:

val cnts = df1.groupBy($"k").sum()

benötigt weiterhin TungstenExchange :

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy Methode in DataFrameWriter (Spark> = 2.0):

bucketBy hat ähnliche Anwendungen wie partitionBy , ist jedoch nur für Tabellen verfügbar ( saveAsTable ). Bucketing-Informationen können zur Optimierung von Joins verwendet werden:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Mit Partitionslayout meine ich nur eine Datenverteilung. partitioned RDD hat keinen Partitionierer mehr. ** Vorausgesetzt keine frühe Projektion. Wenn die Aggregation nur eine kleine Teilmenge der Spalten abdeckt, gibt es wahrscheinlich überhaupt keinen Gewinn.


Also, um mit einer Antwort zu beginnen:) - Das kannst du nicht

Ich bin kein Experte, aber soweit ich DataFrames verstehe, sind sie nicht gleichbedeutend mit rdd und DataFrame hat keinen Partitionierer.

Im Allgemeinen ist es die Idee von DataFrame, eine andere Abstraktionsebene bereitzustellen, die solche Probleme selbst handhabt. Die Abfragen in DataFrame werden in einen logischen Plan übersetzt, der in Operationen auf RDDs weiter umgesetzt wird. Die von Ihnen vorgeschlagene Partitionierung wird wahrscheinlich automatisch angewendet oder sollte es zumindest sein.

Wenn Sie SparkSQL nicht vertrauen, dass es eine Art optimalen Job liefert, können Sie DataFrame wie in den Kommentaren vorgeschlagen immer in RDD [Row] umwandeln.


In Spark <1.6 Wenn Sie einen HiveContext erstellen, können Sie nicht den einfachen alten SqlContext HiveContext , sondern HiveQL DISTRIBUTE BY colX... (stellt sicher, dass jeder von N Reduzierern nicht überlappende Bereiche von x erhält) & CLUSTER BY colX... (Verknüpfung für Verteilen nach und Sortieren nach) zum Beispiel;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Ich bin nicht sicher, wie das mit der Spark DF-API zusammenpasst. Diese Schlüsselwörter werden im normalen SqlContext nicht unterstützt (beachten Sie, dass Sie keinen Hive-Metastore benötigen, um den HiveContext verwenden zu können).

BEARBEITEN : Spark 1.6+ hat dies jetzt in der nativen DataFrame-API


Verwenden Sie den DataFrame, der zurückgegeben wird von:

yourDF.orderBy(account)

Es gibt keine explizite Möglichkeit, partitionBy für einen DataFrame zu verwenden, nur für ein PairRDD. Wenn Sie jedoch einen DataFrame sortieren, wird dies in seinem LogicalPlan verwendet, und dies hilft, wenn Sie Berechnungen für jedes Konto durchführen müssen.

Ich bin gerade auf dasselbe Problem gestoßen, nämlich einen Datenrahmen, den ich nach Konto partitionieren möchte. Ich gehe davon aus, dass Sie, wenn Sie "möchten, dass die Daten so partitioniert werden, dass sich alle Transaktionen für ein Konto in derselben Spark - Partition befinden" sagen, diese für die Skalierung und Leistung benötigen, Ihr Code jedoch nicht davon abhängt (wie bei der Verwendung von mapPartitions() etc), richtig?







partitioning