aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-10-27 10:53:15 -0700
committerXiangrui Meng <meng@databricks.com>2014-10-27 10:53:15 -0700
commitbfa614b12795f1cfce4de0950f90cb8c4f2a7d53 (patch)
treee968afcba36be2d27db36bc17c26f38f1a491a48 /core
parent1d7bcc88401d66c8d17a075355acfc25a8b7615c (diff)
downloadspark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.tar.gz
spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.tar.bz2
spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.zip
SPARK-4022 [CORE] [MLLIB] Replace colt dependency (LGPL) with commons-math
This change replaces usages of colt with commons-math3 equivalents, and makes some minor necessary adjustments to related code and tests to match. Author: Sean Owen <sowen@cloudera.com> Closes #2928 from srowen/SPARK-4022 and squashes the following commits: 61a232f [Sean Owen] Fix failure due to different sampling in JavaAPISuite.sample() 16d66b8 [Sean Owen] Simplify seeding with call to reseedRandomGenerator a1a78e0 [Sean Owen] Use Well19937c 31c7641 [Sean Owen] Fix Python Poisson test by choosing a different seed; about 88% of seeds should work but 1 didn't, it seems 5c9c67f [Sean Owen] Additional test fixes from review d8f88e0 [Sean Owen] Replace colt with commons-math3. Some tests do not pass yet.
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml6
-rw-r--r--core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala32
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala9
11 files changed, 56 insertions, 45 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 320d1076f7..5cd21e18e8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -85,8 +85,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
- <version>3.3</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
@@ -163,10 +161,6 @@
<version>3.2.10</version>
</dependency>
<dependency>
- <groupId>colt</groupId>
- <artifactId>colt</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<classifier>${mesos.classifier}</classifier>
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
index 3155dfe165..637492a975 100644
--- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
@@ -17,7 +17,7 @@
package org.apache.spark.partial
-import cern.jet.stat.Probability
+import org.apache.commons.math3.distribution.NormalDistribution
/**
* An ApproximateEvaluator for counts.
@@ -46,7 +46,8 @@ private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
val mean = (sum + 1 - p) / p
val variance = (sum + 1) * (1 - p) / (p * p)
val stdev = math.sqrt(variance)
- val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
+ val confFactor = new NormalDistribution().
+ inverseCumulativeProbability(1 - (1 - confidence) / 2)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
new BoundedDouble(mean, confidence, low, high)
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index 8bb78123e3..3ef3cc219d 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -24,7 +24,7 @@ import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-import cern.jet.stat.Probability
+import org.apache.commons.math3.distribution.NormalDistribution
import org.apache.spark.util.collection.OpenHashMap
@@ -55,7 +55,8 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf
new HashMap[T, BoundedDouble]
} else {
val p = outputsMerged.toDouble / totalOutputs
- val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
+ val confFactor = new NormalDistribution().
+ inverseCumulativeProbability(1 - (1 - confidence) / 2)
val result = new JHashMap[T, BoundedDouble](sums.size)
sums.foreach { case (key, sum) =>
val mean = (sum + 1 - p) / p
diff --git a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
index d24959cba8..787a21a61f 100644
--- a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
@@ -17,7 +17,7 @@
package org.apache.spark.partial
-import cern.jet.stat.Probability
+import org.apache.commons.math3.distribution.{NormalDistribution, TDistribution}
import org.apache.spark.util.StatCounter
@@ -45,9 +45,10 @@ private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
val stdev = math.sqrt(counter.sampleVariance / counter.count)
val confFactor = {
if (counter.count > 100) {
- Probability.normalInverse(1 - (1 - confidence) / 2)
+ new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2)
} else {
- Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
+ val degreesOfFreedom = (counter.count - 1).toInt
+ new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2)
}
}
val low = mean - confFactor * stdev
diff --git a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
index 92915ee66d..828bf96c2c 100644
--- a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
+++ b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
@@ -17,7 +17,7 @@
package org.apache.spark.partial
-import cern.jet.stat.Probability
+import org.apache.commons.math3.distribution.{TDistribution, NormalDistribution}
/**
* A utility class for caching Student's T distribution values for a given confidence level
@@ -25,8 +25,10 @@ import cern.jet.stat.Probability
* confidence intervals for many keys.
*/
private[spark] class StudentTCacher(confidence: Double) {
+
val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation
- val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
+
+ val normalApprox = new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2)
val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)
def get(sampleSize: Long): Double = {
@@ -35,7 +37,8 @@ private[spark] class StudentTCacher(confidence: Double) {
} else {
val size = sampleSize.toInt
if (cache(size) < 0) {
- cache(size) = Probability.studentTInverse(1 - confidence, size - 1)
+ val tDist = new TDistribution(size - 1)
+ cache(size) = tDist.inverseCumulativeProbability(1 - (1 - confidence) / 2)
}
cache(size)
}
diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
index d533628457..1753c2561b 100644
--- a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
@@ -17,7 +17,7 @@
package org.apache.spark.partial
-import cern.jet.stat.Probability
+import org.apache.commons.math3.distribution.{TDistribution, NormalDistribution}
import org.apache.spark.util.StatCounter
@@ -55,9 +55,10 @@ private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
val sumStdev = math.sqrt(sumVar)
val confFactor = {
if (counter.count > 100) {
- Probability.normalInverse(1 - (1 - confidence) / 2)
+ new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2)
} else {
- Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
+ val degreesOfFreedom = (counter.count - 1).toInt
+ new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2)
}
}
val low = sumEstimate - confFactor * sumStdev
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
index b097c30f8c..9e8cee5331 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
@@ -21,8 +21,7 @@ import java.util.Random
import scala.reflect.ClassTag
-import cern.jet.random.Poisson
-import cern.jet.random.engine.DRand
+import org.apache.commons.math3.distribution.PoissonDistribution
import org.apache.spark.{Partition, TaskContext}
@@ -53,9 +52,11 @@ private[spark] class SampledRDD[T: ClassTag](
if (withReplacement) {
// For large datasets, the expected number of occurrences of each element in a sample with
// replacement is Poisson(frac). We use that to get a count for each element.
- val poisson = new Poisson(frac, new DRand(split.seed))
+ val poisson = new PoissonDistribution(frac)
+ poisson.reseedRandomGenerator(split.seed)
+
firstParent[T].iterator(split.prev, context).flatMap { element =>
- val count = poisson.nextInt()
+ val count = poisson.sample()
if (count == 0) {
Iterator.empty // Avoid object allocation when we return 0 items, which is quite often
} else {
diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
index 32c5fdad75..ee389def20 100644
--- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
@@ -19,8 +19,7 @@ package org.apache.spark.util.random
import java.util.Random
-import cern.jet.random.Poisson
-import cern.jet.random.engine.DRand
+import org.apache.commons.math3.distribution.PoissonDistribution
import org.apache.spark.annotation.DeveloperApi
@@ -87,15 +86,16 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
@DeveloperApi
class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
- private[random] var rng = new Poisson(mean, new DRand)
+ private[random] var rng = new PoissonDistribution(mean)
override def setSeed(seed: Long) {
- rng = new Poisson(mean, new DRand(seed.toInt))
+ rng = new PoissonDistribution(mean)
+ rng.reseedRandomGenerator(seed)
}
override def sample(items: Iterator[T]): Iterator[T] = {
items.flatMap { item =>
- val count = rng.nextInt()
+ val count = rng.sample()
if (count == 0) {
Iterator.empty
} else {
diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
index 8f95d7c6b7..4fa357edd6 100644
--- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
@@ -22,8 +22,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import cern.jet.random.Poisson
-import cern.jet.random.engine.DRand
+import org.apache.commons.math3.distribution.PoissonDistribution
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
@@ -209,7 +208,7 @@ private[spark] object StratifiedSamplingUtils extends Logging {
samplingRateByKey = computeThresholdByKey(finalResult, fractions)
}
(idx: Int, iter: Iterator[(K, V)]) => {
- val rng = new RandomDataGenerator
+ val rng = new RandomDataGenerator()
rng.reSeed(seed + idx)
// Must use the same invoke pattern on the rng as in getSeqOp for without replacement
// in order to generate the same sequence of random numbers when creating the sample
@@ -245,9 +244,9 @@ private[spark] object StratifiedSamplingUtils extends Logging {
// Must use the same invoke pattern on the rng as in getSeqOp for with replacement
// in order to generate the same sequence of random numbers when creating the sample
val copiesAccepted = if (acceptBound == 0) 0L else rng.nextPoisson(acceptBound)
- val copiesWailisted = rng.nextPoisson(finalResult(key).waitListBound)
+ val copiesWaitlisted = rng.nextPoisson(finalResult(key).waitListBound)
val copiesInSample = copiesAccepted +
- (0 until copiesWailisted).count(i => rng.nextUniform() < thresholdByKey(key))
+ (0 until copiesWaitlisted).count(i => rng.nextUniform() < thresholdByKey(key))
if (copiesInSample > 0) {
Iterator.fill(copiesInSample.toInt)(item)
} else {
@@ -261,10 +260,10 @@ private[spark] object StratifiedSamplingUtils extends Logging {
rng.reSeed(seed + idx)
iter.flatMap { item =>
val count = rng.nextPoisson(fractions(item._1))
- if (count > 0) {
- Iterator.fill(count)(item)
- } else {
+ if (count == 0) {
Iterator.empty
+ } else {
+ Iterator.fill(count)(item)
}
}
}
@@ -274,15 +273,24 @@ private[spark] object StratifiedSamplingUtils extends Logging {
/** A random data generator that generates both uniform values and Poisson values. */
private class RandomDataGenerator {
val uniform = new XORShiftRandom()
- var poisson = new Poisson(1.0, new DRand)
+ // commons-math3 doesn't have a method to generate Poisson from an arbitrary mean;
+ // maintain a cache of Poisson(m) distributions for various m
+ val poissonCache = mutable.Map[Double, PoissonDistribution]()
+ var poissonSeed = 0L
- def reSeed(seed: Long) {
+ def reSeed(seed: Long): Unit = {
uniform.setSeed(seed)
- poisson = new Poisson(1.0, new DRand(seed.toInt))
+ poissonSeed = seed
+ poissonCache.clear()
}
def nextPoisson(mean: Double): Int = {
- poisson.nextInt(mean)
+ val poisson = poissonCache.getOrElseUpdate(mean, {
+ val newPoisson = new PoissonDistribution(mean)
+ newPoisson.reseedRandomGenerator(poissonSeed)
+ newPoisson
+ })
+ poisson.sample()
}
def nextUniform(): Double = {
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 814e40c4f7..0172876a26 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -142,7 +142,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(ints);
JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 11);
// expected 2 but of course result varies randomly a bit
- Assert.assertEquals(3, sample20.count());
+ Assert.assertEquals(1, sample20.count());
JavaRDD<Integer> sample20NoReplacement = rdd.sample(false, 0.2, 11);
Assert.assertEquals(2, sample20NoReplacement.count());
}
diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
index 36877476e7..ba67d766a7 100644
--- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.util.random
import java.util.Random
-import cern.jet.random.Poisson
+import org.apache.commons.math3.distribution.PoissonDistribution
+
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
@@ -28,11 +29,11 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
var random: Random = _
- var poisson: Poisson = _
+ var poisson: PoissonDistribution = _
before {
random = mock[Random]
- poisson = mock[Poisson]
+ poisson = mock[PoissonDistribution]
}
test("BernoulliSamplerWithRange") {
@@ -101,7 +102,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
test("PoissonSampler") {
expecting {
for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) {
- poisson.nextInt().andReturn(x)
+ poisson.sample().andReturn(x)
}
}
whenExecuting(poisson) {