pyspark教程 - spark python




如何在Pyspark中使用滑动窗口对时间序列数据进行数据转换 (2)

我试图基于时间序列数据的滑动窗口提取功能。 在Scala中,似乎有一个基于 这篇文章 和 文档 sliding 功能

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

我的问题是PySpark中有类似的功能吗? 或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?



为了增加 venuktan 的答案,这里是如何使用Spark SQL创建一个基于时间的滑动窗口并保留窗口的全部内容,而不是采用它的聚合。 在我将时间序列数据预处理到滑动窗口以输入Spark ML的用例中需要这样做。

这种方法的一个限制是我们假设你想随着时间推移滑动窗口。

首先,您可以创建Spark DataFrame,例如通过读取CSV文件:

df = spark.read.csv('foo.csv')

我们假设您的CSV文件有两列:其中一列是unix时间戳,另一列是要从中提取滑动窗口的列。

from pyspark.sql import functions as f

window_duration = '1000 millisecond'
slide_duration = '500 millisecond'

df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
    .groupBy(f.window("_c0", window_duration, slide_duration)) \
    .agg(f.collect_list(f.array('_c1'))) \
    .withColumnRenamed('collect_list(array(_c1))', 'sliding_window')

额外:要将此数组列转换为Spark ML所需的DenseVector格式, 请参阅此处的UDF方法 。

额外奖励:要取消嵌套生成的列,以便滑动窗口的每个元素都有自己的列,请 在此处尝试此方法 。

我希望这有帮助,如果我能澄清任何事情,请告诉我。







pyspark