aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorYuhao Yang <hhbyyh@gmail.com>2016-03-29 09:16:50 -0700
committerSean Owen <sowen@cloudera.com>2016-03-29 09:16:50 -0700
commitd2a819a6363190b946986ebf6f8001d520098c3b (patch)
tree461d9dde80017357a23d3f0dc0b2e10f8fb05888 /mllib/src/main
parenta632bb56f8867df39a78d7f01fb870f548b09815 (diff)
downloadspark-d2a819a6363190b946986ebf6f8001d520098c3b.tar.gz
spark-d2a819a6363190b946986ebf6f8001d520098c3b.tar.bz2
spark-d2a819a6363190b946986ebf6f8001d520098c3b.zip
[SPARK-14154][MLLIB] Simplify the implementation for Kolmogorov–Smirnov test
## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-14154 I just read the code for KolmogorovSmirnovTest and find it could be much simplified following the original definition. Send a PR for discussion ## How was this patch tested? unit test Author: Yuhao Yang <hhbyyh@gmail.com> Closes #11954 from hhbyyh/ksoptimize.
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala77
1 files changed, 4 insertions, 73 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
index baf9e5e7d1..0ec8975fed 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -64,11 +64,10 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
*/
def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = {
val n = data.count().toDouble
- val localData = data.sortBy(x => x).mapPartitions { part =>
- val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
- searchOneSampleCandidates(partDiffs) // candidates: local extrema
- }.collect()
- val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme
+ val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) =>
+ val f = cdf(v)
+ math.max(f - i / n, (i + 1) / n - f)
+ }.max()
evalOneSampleP(ksStat, n.toLong)
}
@@ -85,74 +84,6 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
}
/**
- * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a
- * partition
- * @param partData `Iterator[Double]` 1 partition of a sorted RDD
- * @param n `Double` the total size of the RDD
- * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
- * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
- * in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
- * the second element corresponds to empirical CDF - CDF. We can then search the resulting
- * iterator for the minimum of the first and the maximum of the second element, and provide
- * this as a partition's candidate extrema
- */
- private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
- : Iterator[(Double, Double)] = {
- // zip data with index (within that partition)
- // calculate local (unadjusted) empirical CDF and subtract CDF
- partData.zipWithIndex.map { case (v, ix) =>
- // dp and dl are later adjusted by constant, when global info is available
- val dp = (ix + 1) / n
- val dl = ix / n
- val cdfVal = cdf(v)
- (dl - cdfVal, dp - cdfVal)
- }
- }
-
- /**
- * Search the unadjusted differences in a partition and return the
- * two extrema (furthest below and furthest above CDF), along with a count of elements in that
- * partition
- * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
- * and CDFin a partition, which come as a tuple of
- * (empirical CDF - 1/N - CDF, empirical CDF - CDF)
- * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
- */
- private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
- : Iterator[(Double, Double, Double)] = {
- val initAcc = (Double.MaxValue, Double.MinValue, 0.0)
- val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) =>
- (math.min(pMin, dl), math.max(pMax, dp), pCt + 1)
- }
- val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults)
- results.iterator
- }
-
- /**
- * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
- * adjusting local extrema estimates from individual partitions with the amount of elements in
- * preceding partitions
- * @param localData `Array[(Double, Double, Double)]` A local array containing the collected
- * results of `searchOneSampleCandidates` across all partitions
- * @param n `Double`The size of the RDD
- * @return The one-sample Kolmogorov Smirnov Statistic
- */
- private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
- : Double = {
- val initAcc = (Double.MinValue, 0.0)
- // adjust differences based on the number of elements preceding it, which should provide
- // the correct distance between empirical CDF and CDF
- val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
- val adjConst = prevCt / n
- val dist1 = math.abs(minCand + adjConst)
- val dist2 = math.abs(maxCand + adjConst)
- val maxVal = Array(prevMax, dist1, dist2).max
- (maxVal, prevCt + ct)
- }
- results._1
- }
-
- /**
* A convenience function that allows running the KS test for 1 set of sample data against
* a named distribution
* @param data the sample data that we wish to evaluate