aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2016-04-13 09:17:46 -0700
committerXiangrui Meng <meng@databricks.com>2016-04-13 09:17:46 -0700
commit323e7390a5c123c48cc7d6d9be44bee3a7eecd99 (patch)
tree1406527341e895d20b30b42505e48e15000744d1
parentdd11e401e45563b4bdc9829f5d23b68dacac8caf (diff)
downloadspark-323e7390a5c123c48cc7d6d9be44bee3a7eecd99.tar.gz
spark-323e7390a5c123c48cc7d6d9be44bee3a7eecd99.tar.bz2
spark-323e7390a5c123c48cc7d6d9be44bee3a7eecd99.zip
Revert "[SPARK-14154][MLLIB] Simplify the implementation for Kolmogorov–Smirnov test"
This reverts commit d2a819a6363190b946986ebf6f8001d520098c3b.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala77
1 files changed, 73 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
index ef284531c9..9748fbf2c9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -64,10 +64,11 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
*/
def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = {
val n = data.count().toDouble
- val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) =>
- val f = cdf(v)
- math.max(f - i / n, (i + 1) / n - f)
- }.max()
+ val localData = data.sortBy(x => x).mapPartitions { part =>
+ val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+ searchOneSampleCandidates(partDiffs) // candidates: local extrema
+ }.collect()
+ val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme
evalOneSampleP(ksStat, n.toLong)
}
@@ -84,6 +85,74 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
}
/**
+ * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a
+ * partition
+ * @param partData `Iterator[Double]` 1 partition of a sorted RDD
+ * @param n `Double` the total size of the RDD
+ * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
+ * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
+ * in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
+ * the second element corresponds to empirical CDF - CDF. We can then search the resulting
+ * iterator for the minimum of the first and the maximum of the second element, and provide
+ * this as a partition's candidate extrema
+ */
+ private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
+ : Iterator[(Double, Double)] = {
+ // zip data with index (within that partition)
+ // calculate local (unadjusted) empirical CDF and subtract CDF
+ partData.zipWithIndex.map { case (v, ix) =>
+ // dp and dl are later adjusted by constant, when global info is available
+ val dp = (ix + 1) / n
+ val dl = ix / n
+ val cdfVal = cdf(v)
+ (dl - cdfVal, dp - cdfVal)
+ }
+ }
+
+ /**
+ * Search the unadjusted differences in a partition and return the
+ * two extrema (furthest below and furthest above CDF), along with a count of elements in that
+ * partition
+ * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
+ * and CDFin a partition, which come as a tuple of
+ * (empirical CDF - 1/N - CDF, empirical CDF - CDF)
+ * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
+ */
+ private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
+ : Iterator[(Double, Double, Double)] = {
+ val initAcc = (Double.MaxValue, Double.MinValue, 0.0)
+ val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) =>
+ (math.min(pMin, dl), math.max(pMax, dp), pCt + 1)
+ }
+ val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults)
+ results.iterator
+ }
+
+ /**
+ * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
+ * adjusting local extrema estimates from individual partitions with the amount of elements in
+ * preceding partitions
+ * @param localData `Array[(Double, Double, Double)]` A local array containing the collected
+ * results of `searchOneSampleCandidates` across all partitions
+ * @param n `Double`The size of the RDD
+ * @return The one-sample Kolmogorov Smirnov Statistic
+ */
+ private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
+ : Double = {
+ val initAcc = (Double.MinValue, 0.0)
+ // adjust differences based on the number of elements preceding it, which should provide
+ // the correct distance between empirical CDF and CDF
+ val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
+ val adjConst = prevCt / n
+ val dist1 = math.abs(minCand + adjConst)
+ val dist2 = math.abs(maxCand + adjConst)
+ val maxVal = Array(prevMax, dist1, dist2).max
+ (maxVal, prevCt + ct)
+ }
+ results._1
+ }
+
+ /**
* A convenience function that allows running the KS test for 1 set of sample data against
* a named distribution
* @param data the sample data that we wish to evaluate