scala - spark time series library



星火斯卡拉-如何做count()通过调节两行 (1)

要访问DataFrame中的上一个/下一个行,我们可以使用Window函数。 在这种情况下,我们将使用lag来访问前一个结束时间,按machineId进行分组。

import org.apache.spark.sql.expressions.Window

// Dataframe Schema
case class MachineData(id:String, start:Int, end:Int)
// Sample Data
machineDF.show
+---+-----+---+
| id|start|end|
+---+-----+---+
|  1|    0|  3|
|  1|    4|  8|
|  1|   10| 20|
|  1|   20| 31|
|  1|  412|578|
|  2|  231|311|
|  2|  781|790|
+---+-----+---+


// define the window as a partition over machineId, ordered by start (time)
val byMachine = Window.partitionBy($"id").orderBy($"start")
// we define a new column, "previous end" using the Lag Window function over the previously defined window
val prevEnd = lag($"end", 1).over(byMachine)

// new DF with the prevEnd column
val withPrevEnd = machineDF.withColumn("prevEnd", prevEnd)
withPrevEnd.show

+---+-----+---+-------+
| id|start|end|prevEnd|
+---+-----+---+-------+
|  1|    0|  3|   null|
|  1|    4|  8|      3|
|  1|   10| 20|      8|
|  1|   20| 31|     20|
|  1|  412|578|     31|
|  2|  231|311|   null|
|  2|  781|790|    311|
+---+-----+---+-------+

// we're calculating the idle intervals as the numerical diff as an example
val idleIntervals = withPrevEnd.withColumn("diff", $"start"-$"prevEnd")
idleIntervals.show

+---+-----+---+-------+----+
| id|start|end|prevEnd|diff|
+---+-----+---+-------+----+
|  1|    0|  3|   null|null|
|  1|    4|  8|      3|   1|
|  1|   10| 20|      8|   2|
|  1|   20| 31|     20|   0|
|  1|  412|578|     31| 381|
|  2|  231|311|   null|null|
|  2|  781|790|    311| 470|
+---+-----+---+-------+----+

// to calculate the total, we are summing over the differences. Adapt this as your business logic requires.
val totalIdleIntervals = idleIntervals.select($"id",$"diff").groupBy($"id").agg(sum("diff"))

+---+---------+
| id|sum(diff)|
+---+---------+
|  1|      384|
|  2|      470|
+---+---------+

我是新手火花斯卡拉,我很抱歉提出愚蠢的问题(如果是)。 我被困在一个问题,我简化如下:

有一个三列的数据框,“machineID”是一台机器的身份。 “startTime”是任务的开始时间标记。 “endTime”是任务的结束时间标记。

我的目标是计算每台机器有多少空闲间隔。
例如,
在下表中,第一行和第二行显示在时间0开始并且在时间3结束并且在时间4再次开始的机器#1,因此时间间隔[3,4]是空闲的。 对于第3行和第4行,机器#1在时间10开始并在时间20结束,并且立即再次开始,所以没有空闲时间。

machineID, startTime, endTime  
1, 0, 3  
1, 4, 8  
1, 10, 20  
1, 20, 31  
...  
1, 412, 578  
...  
2, 231, 311  
2, 781, 790  
...  

数据帧已经是groupBy(“machineID”)。
我正在使用spark 2.0.1和scala 2.11.8





time-series