scala - spark - title tag




Wie ändere ich Spaltentypen im DataFrame von Spark SQL? (14)

Edit: Neueste Version

Seit spark 2.x können Sie .withColumn . Überprüfen Sie die Dokumente hier:

https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

Älteste Antwort

Seit Spark Version 1.4 können Sie die Umwandlungsmethode mit DataType auf die Spalte anwenden:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

Wenn Sie SQL-Ausdrücke verwenden, können Sie auch Folgendes tun:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

Weitere Informationen finden Sie in den Dokumenten: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

Angenommen, ich mache etwas wie:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  

aber ich wollte wirklich das year als Int (und vielleicht einige andere Spalten transformieren).

Das Beste, was ich mir einfallen lassen konnte, ist

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

Das ist ein bisschen gewunden.

Ich komme aus R und bin es gewohnt, schreiben zu können, z

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)

Ich vermisse wahrscheinlich etwas, da es einen besseren Weg geben sollte, dies in spark / scala zu tun ...


Bei den Antworten, die darauf hindeuten, cast zu verwenden, ist die cast-Methode in Spark 1.4.1 fehlerhaft.

Beispielsweise hat ein Datenrahmen mit einer Zeichenfolgenspalte, die den Wert "8182175552014127960" hat, wenn er in bigint umgewandelt wird, den Wert "8182175552014128100".

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

Wir mussten uns mit vielen Problemen auseinandersetzen, bevor wir diesen Fehler fanden, weil wir große Spalten in der Produktion hatten.


Das funktioniert also nur dann wirklich, wenn Sie Probleme beim Speichern auf einem jdbc-Treiber wie sqlserver haben, aber es ist wirklich hilfreich für Fehler, die Sie mit Syntax und Typen antreffen.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)

Diese Methode löscht die alte Spalte und erstellt neue Spalten mit denselben Werten und neuem Datentyp. Meine ursprünglichen Datentypen beim Erstellen des DataFrame waren:

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

Danach habe ich folgenden Code ausgeführt, um den Datentyp zu ändern: -

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)

Danach ergab sich folgendes Ergebnis:

df.withColumn("year", df("year").cast(IntegerType))

Generieren Sie ein einfaches Dataset mit fünf Werten und konvertieren Sie int in einen string :

val df = spark.range(5).select( col("id").cast("string") )

Ich denke, das ist für mich viel lesbarer.

    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()

Dadurch wird Ihre IntegerType in IntegerType konvertiert, wobei temporäre Spalten erstellt und diese Spalten IntegerType . Wenn Sie in einen anderen Datentyp konvertieren möchten, können Sie die Typen im Paket org.apache.spark.sql.types überprüfen.


Mit Spark Sql 2.4.0 können Sie das tun:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")

Sie können selectExpr , um es ein wenig sauberer zu machen:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")

Sie können den folgenden Code verwenden.

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

IntegerType die IntegerType Spalte IntegerType wird.


Um das Jahr von string in int zu konvertieren, können Sie dem csv-Reader die folgende Option hinzufügen: "inferSchema" -> "true", siehe DataBricks-Dokumentation


Wenn Sie zuerst einen Typ gießen möchten, dann ist dies:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

Bei gleichem Spaltennamen wird die Spalte durch eine neue ersetzt. Sie müssen keine Schritte hinzufügen und löschen.

Zweitens über Scala vs R.
Dies ist der Code, der RI am ähnlichsten ist:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Die Codelänge ist jedoch etwas länger als die von R. Das hat nichts mit der Ausführlichkeit der Sprache zu tun. In R ist das Mutieren eine spezielle Funktion für R-Datenrahmen, während Sie in Scala dank seiner Ausdruckskraft leicht eine Ad-hoc-Funktion ausführen können.
Mit anderen Worten, spezifische Lösungen werden vermieden, da die Grundlage ausreicht, um schnell und einfach Ihre eigenen Domain-Sprachfunktionen zu erstellen.

Anmerkung: df.columns ist überraschenderweise ein Array[String] anstelle von Array[Column] . Vielleicht möchten sie, dass es wie der Datenrahmen von Python pandas aussieht.


[EDIT: März 2016: Danke für die Stimmen! Obwohl dies wirklich nicht die beste Antwort ist, denke ich, dass die Lösungen, die auf withColumn , withColumnRenamed und cast basieren, die von msemelman, Martin Senne und anderen vorgeschlagen wurden, einfacher und sauberer sind.

Ich denke, Ihr Ansatz ist in DataFrame Denken Sie daran, dass ein Spark- DataFrame eine (unveränderliche) RDD von Zeilen ist. Wir ersetzen also nie wirklich eine Spalte, DataFrame jedes Mal einen neuen DataFrame mit einem neuen Schema.

Angenommen, Sie haben eine Original-DF mit dem folgenden Schema:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

Einige UDFs sind in einer oder mehreren Spalten definiert:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Das Ändern von Spaltentypen oder sogar das Erstellen eines neuen DataFrames aus einem anderen kann folgendermaßen geschrieben werden:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

was ergibt:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

Dies kommt Ihrer eigenen Lösung ziemlich nahe. Durch Beibehalten der udf val und anderer Transformationen als separate udf val der Code udf val lesbarer und wiederverwendbarer.


df.select($"long_col".cast(IntegerType).as("int_col"))

Another solution is as follows:
1) Keep "inferSchema" as False
2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)

<Code>
        //Read CSV and create dataset
        Dataset<Row> enginesDataSet = sparkSession
                    .read()
                    .format("com.databricks.spark.csv")
                    .option("header", "true")
                    .option("inferSchema","false")
                    .load(args[0]);

        JavaRDD<Box> vertices = enginesDataSet
                    .select("BOX","BOX_CD")
                    .toJavaRDD()
                    .map(new Function<Row, Box>() {
                        @Override
                        public Box call(Row row) throws Exception {
                            return new Box((String)row.getString(0),(String)row.get(1));
                        }
                    });
</Code>






apache-spark-sql