diff options
author | Sean Owen <sowen@cloudera.com> | 2014-10-27 10:53:15 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-10-27 10:53:15 -0700 |
commit | bfa614b12795f1cfce4de0950f90cb8c4f2a7d53 (patch) | |
tree | e968afcba36be2d27db36bc17c26f38f1a491a48 /core/src/main | |
parent | 1d7bcc88401d66c8d17a075355acfc25a8b7615c (diff) | |
download | spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.tar.gz spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.tar.bz2 spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.zip |
SPARK-4022 [CORE] [MLLIB] Replace colt dependency (LGPL) with commons-math
This change replaces usages of colt with commons-math3 equivalents, and makes some minor necessary adjustments to related code and tests to match.
Author: Sean Owen <sowen@cloudera.com>
Closes #2928 from srowen/SPARK-4022 and squashes the following commits:
61a232f [Sean Owen] Fix failure due to different sampling in JavaAPISuite.sample()
16d66b8 [Sean Owen] Simplify seeding with call to reseedRandomGenerator
a1a78e0 [Sean Owen] Use Well19937c
31c7641 [Sean Owen] Fix Python Poisson test by choosing a different seed; about 88% of seeds should work but 1 didn't, it seems
5c9c67f [Sean Owen] Additional test fixes from review
d8f88e0 [Sean Owen] Replace colt with commons-math3. Some tests do not pass yet.
Diffstat (limited to 'core/src/main')
8 files changed, 50 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala index 3155dfe165..637492a975 100644 --- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.NormalDistribution /** * An ApproximateEvaluator for counts. @@ -46,7 +46,8 @@ private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double) val mean = (sum + 1 - p) / p val variance = (sum + 1) * (1 - p) / (p * p) val stdev = math.sqrt(variance) - val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2) + val confFactor = new NormalDistribution(). + inverseCumulativeProbability(1 - (1 - confidence) / 2) val low = mean - confFactor * stdev val high = mean + confFactor * stdev new BoundedDouble(mean, confidence, low, high) diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index 8bb78123e3..3ef3cc219d 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -24,7 +24,7 @@ import scala.collection.Map import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.NormalDistribution import org.apache.spark.util.collection.OpenHashMap @@ -55,7 +55,8 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf new HashMap[T, BoundedDouble] } else { val p = outputsMerged.toDouble / totalOutputs - val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2) + val confFactor = new NormalDistribution(). + inverseCumulativeProbability(1 - (1 - confidence) / 2) val result = new JHashMap[T, BoundedDouble](sums.size) sums.foreach { case (key, sum) => val mean = (sum + 1 - p) / p diff --git a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala index d24959cba8..787a21a61f 100644 --- a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.{NormalDistribution, TDistribution} import org.apache.spark.util.StatCounter @@ -45,9 +45,10 @@ private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double) val stdev = math.sqrt(counter.sampleVariance / counter.count) val confFactor = { if (counter.count > 100) { - Probability.normalInverse(1 - (1 - confidence) / 2) + new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) } else { - Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt) + val degreesOfFreedom = (counter.count - 1).toInt + new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2) } } val low = mean - confFactor * stdev diff --git a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala index 92915ee66d..828bf96c2c 100644 --- a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala +++ b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.{TDistribution, NormalDistribution} /** * A utility class for caching Student's T distribution values for a given confidence level @@ -25,8 +25,10 @@ import cern.jet.stat.Probability * confidence intervals for many keys. */ private[spark] class StudentTCacher(confidence: Double) { + val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation - val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2) + + val normalApprox = new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0) def get(sampleSize: Long): Double = { @@ -35,7 +37,8 @@ private[spark] class StudentTCacher(confidence: Double) { } else { val size = sampleSize.toInt if (cache(size) < 0) { - cache(size) = Probability.studentTInverse(1 - confidence, size - 1) + val tDist = new TDistribution(size - 1) + cache(size) = tDist.inverseCumulativeProbability(1 - (1 - confidence) / 2) } cache(size) } diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala index d533628457..1753c2561b 100644 --- a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.{TDistribution, NormalDistribution} import org.apache.spark.util.StatCounter @@ -55,9 +55,10 @@ private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) val sumStdev = math.sqrt(sumVar) val confFactor = { if (counter.count > 100) { - Probability.normalInverse(1 - (1 - confidence) / 2) + new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) } else { - Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt) + val degreesOfFreedom = (counter.count - 1).toInt + new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2) } } val low = sumEstimate - confFactor * sumStdev diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala index b097c30f8c..9e8cee5331 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala @@ -21,8 +21,7 @@ import java.util.Random import scala.reflect.ClassTag -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.{Partition, TaskContext} @@ -53,9 +52,11 @@ private[spark] class SampledRDD[T: ClassTag]( if (withReplacement) { // For large datasets, the expected number of occurrences of each element in a sample with // replacement is Poisson(frac). We use that to get a count for each element. - val poisson = new Poisson(frac, new DRand(split.seed)) + val poisson = new PoissonDistribution(frac) + poisson.reseedRandomGenerator(split.seed) + firstParent[T].iterator(split.prev, context).flatMap { element => - val count = poisson.nextInt() + val count = poisson.sample() if (count == 0) { Iterator.empty // Avoid object allocation when we return 0 items, which is quite often } else { diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 32c5fdad75..ee389def20 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -19,8 +19,7 @@ package org.apache.spark.util.random import java.util.Random -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.annotation.DeveloperApi @@ -87,15 +86,16 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) @DeveloperApi class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + private[random] var rng = new PoissonDistribution(mean) override def setSeed(seed: Long) { - rng = new Poisson(mean, new DRand(seed.toInt)) + rng = new PoissonDistribution(mean) + rng.reseedRandomGenerator(seed) } override def sample(items: Iterator[T]): Iterator[T] = { items.flatMap { item => - val count = rng.nextInt() + val count = rng.sample() if (count == 0) { Iterator.empty } else { diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala index 8f95d7c6b7..4fa357edd6 100644 --- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala @@ -22,8 +22,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.Logging import org.apache.spark.SparkContext._ @@ -209,7 +208,7 @@ private[spark] object StratifiedSamplingUtils extends Logging { samplingRateByKey = computeThresholdByKey(finalResult, fractions) } (idx: Int, iter: Iterator[(K, V)]) => { - val rng = new RandomDataGenerator + val rng = new RandomDataGenerator() rng.reSeed(seed + idx) // Must use the same invoke pattern on the rng as in getSeqOp for without replacement // in order to generate the same sequence of random numbers when creating the sample @@ -245,9 +244,9 @@ private[spark] object StratifiedSamplingUtils extends Logging { // Must use the same invoke pattern on the rng as in getSeqOp for with replacement // in order to generate the same sequence of random numbers when creating the sample val copiesAccepted = if (acceptBound == 0) 0L else rng.nextPoisson(acceptBound) - val copiesWailisted = rng.nextPoisson(finalResult(key).waitListBound) + val copiesWaitlisted = rng.nextPoisson(finalResult(key).waitListBound) val copiesInSample = copiesAccepted + - (0 until copiesWailisted).count(i => rng.nextUniform() < thresholdByKey(key)) + (0 until copiesWaitlisted).count(i => rng.nextUniform() < thresholdByKey(key)) if (copiesInSample > 0) { Iterator.fill(copiesInSample.toInt)(item) } else { @@ -261,10 +260,10 @@ private[spark] object StratifiedSamplingUtils extends Logging { rng.reSeed(seed + idx) iter.flatMap { item => val count = rng.nextPoisson(fractions(item._1)) - if (count > 0) { - Iterator.fill(count)(item) - } else { + if (count == 0) { Iterator.empty + } else { + Iterator.fill(count)(item) } } } @@ -274,15 +273,24 @@ private[spark] object StratifiedSamplingUtils extends Logging { /** A random data generator that generates both uniform values and Poisson values. */ private class RandomDataGenerator { val uniform = new XORShiftRandom() - var poisson = new Poisson(1.0, new DRand) + // commons-math3 doesn't have a method to generate Poisson from an arbitrary mean; + // maintain a cache of Poisson(m) distributions for various m + val poissonCache = mutable.Map[Double, PoissonDistribution]() + var poissonSeed = 0L - def reSeed(seed: Long) { + def reSeed(seed: Long): Unit = { uniform.setSeed(seed) - poisson = new Poisson(1.0, new DRand(seed.toInt)) + poissonSeed = seed + poissonCache.clear() } def nextPoisson(mean: Double): Int = { - poisson.nextInt(mean) + val poisson = poissonCache.getOrElseUpdate(mean, { + val newPoisson = new PoissonDistribution(mean) + newPoisson.reseedRandomGenerator(poissonSeed) + newPoisson + }) + poisson.sample() } def nextUniform(): Double = { |