使用 InterQuartileRange 消除 Spark 中的异常值会导致错误

数据挖掘 阿帕奇火花 斯卡拉
2022-02-20 21:46:35

我有以下函数应该计算给定数据集的异常值。

def interQuartileRangeFiltering(df: DataFrame): DataFrame = {
    @scala.annotation.tailrec
    def inner(cols: List[String], acc: DataFrame): DataFrame = cols match {
      case Nil          => acc
      case column :: xs =>
        val quantiles = acc.stat.approxQuantile(column, Array(0.25, 0.75), 0.0) // TODO: values should come from config
        println(s"$column ${quantiles.size}")
        val q1 = quantiles(0)
        val q3 = quantiles(1)
        val iqr = q1 - q3
        val lowerRange = q1 - 1.5 * iqr
        val upperRange = q3 + 1.5 * iqr
        val filtered = acc.filter(s"$column < $lowerRange or $column > $upperRange")
        inner(xs, filtered)
    }
    inner(df.columns.toList, df)
}

val outlierDF = interQuartileRangeFiltering(incomingDF)

但是发生的情况是,incomingDF 中有一些分类特征,或者换句话说,值为 0 或 1 的二进制类型。如果包含它们,我最终会收到如下错误:

housing_median_age 2
inland 2
island 2
population 2
total_bedrooms 2
near_bay 2
near_ocean 2
median_house_value 0
java.lang.ArrayIndexOutOfBoundsException: 0
  at inner$1(<console>:75)
  at interQuartileRangeFiltering(<console>:83)
  ... 54 elided

我有几个关于如何处理 0 或 1 数据的异常值的问题。我可以在执行 IQR 时忽略它们,这似乎是一种合理的方法,但现在我的问题是,如果我忽略它们,那么我将如何将生成的 DataFrame(在运行上面的递归函数之后)与 OneHotEncoded 列连接回来?

例如,如果原始数据帧,在这种情况下,incomingDF 包含 10000 行并且在异常值检测之后,它最终大约是 9000 行,那么排除的列(OneHotEncoded 列)仍然有 10000,我将如何合并这些两个数据框?不知何故,这让我感到困惑。

有人可以帮我一条出路吗?

1个回答

你至少有两个选择:

  • 选项 1:将完整的数据框传递给函数,可能带有指定要继续或忽略哪些列的参数。每当发现异常值时,都会立即删除整行,最后返回结果数据帧及其所有列。
  • 选项 2:该函数仅适用于数字列,但不是直接删除行,而是返回要删除的行索引列表。然后可以从具有相同原始行数的任何数据帧返回行。