python - pyspark创建dataframe - spark dataframe修改列名




如何将PySpark中的表数据框导出到CSV? (4)

如果数据帧适合驱动程序内存,并且您想保存到本地文件系统,则可以使用 toPandas 方法将 Spark DataFrame 转换为本地 Pandas DataFrame ,然后只需使用 to_csv

df.toPandas().to_csv('mycsv.csv')

否则,您可以使用 spark-csv

  • 火花1.3

    df.save('mycsv.csv', 'com.databricks.spark.csv')
  • 火花1.4+

    df.write.format('com.databricks.spark.csv').save('mycsv.csv')

在Spark 2.0+中,您可以直接使用 csv 数据源:

df.write.csv('mycsv.csv')

我正在使用Spark 1.3.1(PySpark),并且已经使用SQL查询生成了一个表。 我现在有一个对象,它是一个 DataFrame 。 我想将此 DataFrame 对象(我称其为“表”)导出到一个csv文件,以便我可以操纵它并绘制列。 如何将 DataFrame “表”导出到csv文件?

谢谢!


如果无法使用spark-csv,则可以执行以下操作:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")

如果您需要使用换行符或逗号来处理字符串,则这些字符串将不起作用。 用这个:

import csv
import cStringIO

def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")

怎么样(在你不想一个班轮)?

for row in df.collect():
    d = row.asDict()
    s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
    f.write(s)

f是打开的文件描述符。 分隔符也是TAB字符,但是很容易更改为所需的任何字符。


您需要将Dataframe重新划分为一个分区,然后以Unix文件系统格式定义文件的格式,路径和其他参数,然后就可以开始了,

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')

阅读有关 重新分区功能的 更多信息阅读有关 保存功能的 更多信息

但是,重新分区是一项代价高昂的函数,并且toPandas()最糟糕。 尝试在以前的语法中使用.coalesce(1)代替.repartition(1)以获得更好的性能。

阅读有关 分区功能与合并功能的更多信息





export-to-csv