From 323e7390a5c123c48cc7d6d9be44bee3a7eecd99 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 13 Apr 2016 09:17:46 -0700 Subject: Revert "[SPARK-14154][MLLIB] Simplify the implementation for Kolmogorov–Smirnov test" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit d2a819a6363190b946986ebf6f8001d520098c3b. --- .../mllib/stat/test/KolmogorovSmirnovTest.scala | 77 ++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index ef284531c9..9748fbf2c9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -64,10 +64,11 @@ private[stat] object KolmogorovSmirnovTest extends Logging { */ def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = { val n = data.count().toDouble - val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) => - val f = cdf(v) - math.max(f - i / n, (i + 1) / n - f) - }.max() + val localData = data.sortBy(x => x).mapPartitions { part => + val partDiffs = oneSampleDifferences(part, n, cdf) // local distances + searchOneSampleCandidates(partDiffs) // candidates: local extrema + }.collect() + val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme evalOneSampleP(ksStat, n.toLong) } @@ -83,6 +84,74 @@ private[stat] object KolmogorovSmirnovTest extends Logging { testOneSample(data, cdf) } + /** + * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a + * partition + * @param partData `Iterator[Double]` 1 partition of a sorted RDD + * @param n `Double` the total size of the RDD + * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value + * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema + * in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF, + * the second element corresponds to empirical CDF - CDF. We can then search the resulting + * iterator for the minimum of the first and the maximum of the second element, and provide + * this as a partition's candidate extrema + */ + private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double) + : Iterator[(Double, Double)] = { + // zip data with index (within that partition) + // calculate local (unadjusted) empirical CDF and subtract CDF + partData.zipWithIndex.map { case (v, ix) => + // dp and dl are later adjusted by constant, when global info is available + val dp = (ix + 1) / n + val dl = ix / n + val cdfVal = cdf(v) + (dl - cdfVal, dp - cdfVal) + } + } + + /** + * Search the unadjusted differences in a partition and return the + * two extrema (furthest below and furthest above CDF), along with a count of elements in that + * partition + * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF + * and CDFin a partition, which come as a tuple of + * (empirical CDF - 1/N - CDF, empirical CDF - CDF) + * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements + */ + private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)]) + : Iterator[(Double, Double, Double)] = { + val initAcc = (Double.MaxValue, Double.MinValue, 0.0) + val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) => + (math.min(pMin, dl), math.max(pMax, dp), pCt + 1) + } + val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults) + results.iterator + } + + /** + * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after + * adjusting local extrema estimates from individual partitions with the amount of elements in + * preceding partitions + * @param localData `Array[(Double, Double, Double)]` A local array containing the collected + * results of `searchOneSampleCandidates` across all partitions + * @param n `Double`The size of the RDD + * @return The one-sample Kolmogorov Smirnov Statistic + */ + private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) + : Double = { + val initAcc = (Double.MinValue, 0.0) + // adjust differences based on the number of elements preceding it, which should provide + // the correct distance between empirical CDF and CDF + val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) => + val adjConst = prevCt / n + val dist1 = math.abs(minCand + adjConst) + val dist2 = math.abs(maxCand + adjConst) + val maxVal = Array(prevMax, dist1, dist2).max + (maxVal, prevCt + ct) + } + results._1 + } + /** * A convenience function that allows running the KS test for 1 set of sample data against * a named distribution -- cgit v1.2.3