hadoop - write - 如何将数据从Spark SQL导出到CSV
spark write csv (4)
上面使用spark-csv的答案是正确的但是有一个问题 - 库根据数据帧分区创建了几个文件。 这不是我们通常需要的。 因此,您可以将所有分区合并为一个:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
并将lib的输出(名称“part-00000”)重命名为所需的文件名。
此博客文章提供了更多详细信息: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/ : https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
此命令适用于HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
但是使用Spark SQL我收到org.apache.spark.sql.hive.HiveQl
堆栈跟踪的错误:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
请指导我在Spark SQL中编写导出到CSV功能。
在spark-csv的帮助下,我们可以写入CSV文件。
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
最简单的方法是映射DataFrame的RDD并使用mkString:
df.rdd.map(x=>x.mkString(","))
从Spark 1.5开始(或甚至在此之前) df.map(r=>r.mkString(","))
如果你想要CSV转义你也可以使用apache commons lang。 例如,这是我们正在使用的代码
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
由于Spark 2.X
spark-csv
被集成为本机数据源 。 因此,必要的声明简化为(windows)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
或UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")