根据先前值的计算过滤 Apache Spark 中的异常值

数据挖掘 阿帕奇火花 地理空间
2022-02-19 13:35:04

我正在使用具有以下架构的 Spark 2.0 Dataframes 处理地理空间数据:

root
 |-- date: timestamp (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- accuracy: double (nullable = true)
 |-- track_id: long (nullable = true)

我已经看到位置信号跳到了完全不同的地方。奇怪的是,信号会持续一段时间,比如在远程位置大约 25 秒或 5 个样本,然后跳回我所在的位置。

我想通过计算点之间的速度来计算当前和“最后一个有效记录”之间的速度来消除这些异常值。如果速度高于给定阈值,则应删除当前记录,“最后一个有效记录”保持不变。如果速度低于阈值,则将当前记录添加到结果数据框中并成为新的“最后一个有效记录”。

我将 Spark 2.0 与 Dataframes 一起使用。

任何有关如何实施此策略或任何更好策略的建议都将受到高度赞赏。谢谢。

PS:我在stackoverflow中问了同样的问题,并给出了具体的实现。但是,由于我不确定这是否是正确的方法,并且不想偏向某个 Spark 方法的答案,所以我在这里征求任何建议。 https://stackoverflow.com/questions/41002844/how-to-filter-outlier-rows-from-spark-dataframe-based-on-distance-to-previous-va

1个回答

这实际上是时间序列数据的一个普遍问题:您需要根据序列中的一个或多个值来实现一些逻辑。你总是有两个选择:

  1. 通过某个模块提供时间序列,该模块在每个数据点到达时进行计算
  2. 使用“电子表格法”计算最终到达目标的一系列列

第一种方法的优点是您可以使用相同的模块来处理您的实时数据。第二种方法的优点是速度非常快并且通常更容易实现。

由于您已经在 Spark 数据集中,因此策略如下:

  1. 计算速度列:ptpt1在哪里p是位置
  2. 计算“跳跃”列:如果速度超过某个阈值,则为 1,如果低于某个阈值,则为 -1,否则为 0
  3. 计算一个“跳转和”列:跳转列的累计和
  4. 坏数据的跳跃和为1;过滤掉它们

这是你如何做到的:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val ss: SparkSession = SparkSession.builder.getOrCreate()

// note the file must be on each executor in the same directory
val ds = ss.read
   .option("header", "true")
   .option("inferSchema", "true")
   .csv("file:///home/peter/data.csv")

val w = Window.partitionBy().orderBy("datetime")
val threshold = 10
def jump(v: Double): Int = if (v > threshold) 1 else if (v < -threshold) -1 else 0
val sqlJump = udf(jump _)

val cleanDS = ds
    .withColumn("speed", $"position" - lag($"position", 1).over(w.rowsBetween(-1, -1)))
    .withColumn("jump", sqlJump($"speed"))
    .withColumn("jumpsum", sum($"jump").over(w.rowsBetween(Long.MinValue, 0)))

这是输出数据集的样子(我没有删除坏行,所以你可以看到计算):

+--------+--------+-----+----+-------+
|datetime|position|speed|jump|jumpsum|
+--------+--------+-----+----+-------+
|       1|       1| null|null|   null|
|       2|       1|    0|   0|      0|
|       3|       1|    0|   0|      0|
|       4|       1|    0|   0|      0|
|       5|       1|    0|   0|      0|
|       6|       2|    1|   0|      0|
|       7|       1|   -1|   0|      0|
|       8|       1|    0|   0|      0|
|       9|      46|   45|   1|      1|
|      10|      45|   -1|   0|      1|
|      11|      48|    3|   0|      1|
|      12|      45|   -3|   0|      1|
|      13|       1|  -44|  -1|      0|
|      14|       2|    1|   0|      0|
|      15|       1|   -1|   0|      0|
+--------+--------+-----+----+-------+

“data.csv”只是该数据集的前两列:

datetime,position
1,1
2,1
3,1
4,1

...ETC。

剩下要做的就是过滤掉jumpsum === 1.