这实际上是时间序列数据的一个普遍问题:您需要根据序列中的一个或多个值来实现一些逻辑。你总是有两个选择:
- 通过某个模块提供时间序列,该模块在每个数据点到达时进行计算
- 使用“电子表格法”计算最终到达目标的一系列列
第一种方法的优点是您可以使用相同的模块来处理您的实时数据。第二种方法的优点是速度非常快并且通常更容易实现。
由于您已经在 Spark 数据集中,因此策略如下:
- 计算速度列:pt−pt−1在哪里p是位置
- 计算“跳跃”列:如果速度超过某个阈值,则为 1,如果低于某个阈值,则为 -1,否则为 0
- 计算一个“跳转和”列:跳转列的累计和
- 坏数据的跳跃和为1;过滤掉它们
这是你如何做到的:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val ss: SparkSession = SparkSession.builder.getOrCreate()
// note the file must be on each executor in the same directory
val ds = ss.read
.option("header", "true")
.option("inferSchema", "true")
.csv("file:///home/peter/data.csv")
val w = Window.partitionBy().orderBy("datetime")
val threshold = 10
def jump(v: Double): Int = if (v > threshold) 1 else if (v < -threshold) -1 else 0
val sqlJump = udf(jump _)
val cleanDS = ds
.withColumn("speed", $"position" - lag($"position", 1).over(w.rowsBetween(-1, -1)))
.withColumn("jump", sqlJump($"speed"))
.withColumn("jumpsum", sum($"jump").over(w.rowsBetween(Long.MinValue, 0)))
这是输出数据集的样子(我没有删除坏行,所以你可以看到计算):
+--------+--------+-----+----+-------+
|datetime|position|speed|jump|jumpsum|
+--------+--------+-----+----+-------+
| 1| 1| null|null| null|
| 2| 1| 0| 0| 0|
| 3| 1| 0| 0| 0|
| 4| 1| 0| 0| 0|
| 5| 1| 0| 0| 0|
| 6| 2| 1| 0| 0|
| 7| 1| -1| 0| 0|
| 8| 1| 0| 0| 0|
| 9| 46| 45| 1| 1|
| 10| 45| -1| 0| 1|
| 11| 48| 3| 0| 1|
| 12| 45| -3| 0| 1|
| 13| 1| -44| -1| 0|
| 14| 2| 1| 0| 0|
| 15| 1| -1| 0| 0|
+--------+--------+-----+----+-------+
“data.csv”只是该数据集的前两列:
datetime,position
1,1
2,1
3,1
4,1
...ETC。
剩下要做的就是过滤掉jumpsum === 1.