aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala286
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java9
-rw-r--r--core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala606
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala4
5 files changed, 790 insertions, 121 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b7f125d01d..c169b2d3fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
import org.apache.spark.util.collection.OpenHashMap
-import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
+import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
+ SamplingUtils}
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
@@ -375,7 +376,8 @@ abstract class RDD[T: ClassTag](
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
- new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed)
+ new PartitionwiseSampledRDD[T, T](
+ this, new BernoulliCellSampler[T](x(0), x(1)), true, seed)
}.toArray
}
diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
index ee389def20..76e7a2760b 100644
--- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
@@ -19,6 +19,9 @@ package org.apache.spark.util.random
import java.util.Random
+import scala.reflect.ClassTag
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.commons.math3.distribution.PoissonDistribution
import org.apache.spark.annotation.DeveloperApi
@@ -38,13 +41,47 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
/** take a random sample */
def sample(items: Iterator[T]): Iterator[U]
+ /** return a copy of the RandomSampler object */
override def clone: RandomSampler[T, U] =
throw new NotImplementedError("clone() is not implemented.")
}
+private[spark]
+object RandomSampler {
+ /** Default random number generator used by random samplers. */
+ def newDefaultRNG: Random = new XORShiftRandom
+
+ /**
+ * Default maximum gap-sampling fraction.
+ * For sampling fractions <= this value, the gap sampling optimization will be applied.
+ * Above this value, it is assumed that "tradtional" Bernoulli sampling is faster. The
+ * optimal value for this will depend on the RNG. More expensive RNGs will tend to make
+ * the optimal value higher. The most reliable way to determine this value for a new RNG
+ * is to experiment. When tuning for a new RNG, I would expect a value of 0.5 to be close
+ * in most cases, as an initial guess.
+ */
+ val defaultMaxGapSamplingFraction = 0.4
+
+ /**
+ * Default epsilon for floating point numbers sampled from the RNG.
+ * The gap-sampling compute logic requires taking log(x), where x is sampled from an RNG.
+ * To guard against errors from taking log(0), a positive epsilon lower bound is applied.
+ * A good value for this parameter is at or near the minimum positive floating
+ * point value returned by "nextDouble()" (or equivalent), for the RNG being used.
+ */
+ val rngEpsilon = 5e-11
+
+ /**
+ * Sampling fraction arguments may be results of computation, and subject to floating
+ * point jitter. I check the arguments with this epsilon slop factor to prevent spurious
+ * warnings for cases such as summing some numbers to get a sampling fraction of 1.000000001
+ */
+ val roundingEpsilon = 1e-6
+}
+
/**
* :: DeveloperApi ::
- * A sampler based on Bernoulli trials.
+ * A sampler based on Bernoulli trials for partitioning a data sequence.
*
* @param lb lower bound of the acceptance range
* @param ub upper bound of the acceptance range
@@ -52,57 +89,262 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
* @tparam T item type
*/
@DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
+class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false)
extends RandomSampler[T, T] {
- private[random] var rng: Random = new XORShiftRandom
+ /** epsilon slop to avoid failure from floating point jitter. */
+ require(
+ lb <= (ub + RandomSampler.roundingEpsilon),
+ s"Lower bound ($lb) must be <= upper bound ($ub)")
+ require(
+ lb >= (0.0 - RandomSampler.roundingEpsilon),
+ s"Lower bound ($lb) must be >= 0.0")
+ require(
+ ub <= (1.0 + RandomSampler.roundingEpsilon),
+ s"Upper bound ($ub) must be <= 1.0")
- def this(ratio: Double) = this(0.0d, ratio)
+ private val rng: Random = new XORShiftRandom
override def setSeed(seed: Long) = rng.setSeed(seed)
override def sample(items: Iterator[T]): Iterator[T] = {
- items.filter { item =>
- val x = rng.nextDouble()
- (x >= lb && x < ub) ^ complement
+ if (ub - lb <= 0.0) {
+ if (complement) items else Iterator.empty
+ } else {
+ if (complement) {
+ items.filter { item => {
+ val x = rng.nextDouble()
+ (x < lb) || (x >= ub)
+ }}
+ } else {
+ items.filter { item => {
+ val x = rng.nextDouble()
+ (x >= lb) && (x < ub)
+ }}
+ }
}
}
/**
* Return a sampler that is the complement of the range specified of the current sampler.
*/
- def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement)
+ def cloneComplement(): BernoulliCellSampler[T] =
+ new BernoulliCellSampler[T](lb, ub, !complement)
+
+ override def clone = new BernoulliCellSampler[T](lb, ub, complement)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] {
+
+ /** epsilon slop to avoid failure from floating point jitter */
+ require(
+ fraction >= (0.0 - RandomSampler.roundingEpsilon)
+ && fraction <= (1.0 + RandomSampler.roundingEpsilon),
+ s"Sampling fraction ($fraction) must be on interval [0, 1]")
- override def clone = new BernoulliSampler[T](lb, ub, complement)
+ private val rng: Random = RandomSampler.newDefaultRNG
+
+ override def setSeed(seed: Long) = rng.setSeed(seed)
+
+ override def sample(items: Iterator[T]): Iterator[T] = {
+ if (fraction <= 0.0) {
+ Iterator.empty
+ } else if (fraction >= 1.0) {
+ items
+ } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
+ new GapSamplingIterator(items, fraction, rng, RandomSampler.rngEpsilon)
+ } else {
+ items.filter { _ => rng.nextDouble() <= fraction }
+ }
+ }
+
+ override def clone = new BernoulliSampler[T](fraction)
}
+
/**
* :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from Poisson distribution.
*
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
* @tparam T item type
*/
@DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] {
+
+ /** Epsilon slop to avoid failure from floating point jitter. */
+ require(
+ fraction >= (0.0 - RandomSampler.roundingEpsilon),
+ s"Sampling fraction ($fraction) must be >= 0")
- private[random] var rng = new PoissonDistribution(mean)
+ // PoissonDistribution throws an exception when fraction <= 0
+ // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value.
+ private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0)
+ private val rngGap = RandomSampler.newDefaultRNG
override def setSeed(seed: Long) {
- rng = new PoissonDistribution(mean)
rng.reseedRandomGenerator(seed)
+ rngGap.setSeed(seed)
}
override def sample(items: Iterator[T]): Iterator[T] = {
- items.flatMap { item =>
- val count = rng.sample()
- if (count == 0) {
- Iterator.empty
- } else {
- Iterator.fill(count)(item)
- }
+ if (fraction <= 0.0) {
+ Iterator.empty
+ } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
+ new GapSamplingReplacementIterator(items, fraction, rngGap, RandomSampler.rngEpsilon)
+ } else {
+ items.flatMap { item => {
+ val count = rng.sample()
+ if (count == 0) Iterator.empty else Iterator.fill(count)(item)
+ }}
+ }
+ }
+
+ override def clone = new PoissonSampler[T](fraction)
+}
+
+
+private[spark]
+class GapSamplingIterator[T: ClassTag](
+ var data: Iterator[T],
+ f: Double,
+ rng: Random = RandomSampler.newDefaultRNG,
+ epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] {
+
+ require(f > 0.0 && f < 1.0, s"Sampling fraction ($f) must reside on open interval (0, 1)")
+ require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
+
+ /** implement efficient linear-sequence drop until Scala includes fix for jira SI-8835. */
+ private val iterDrop: Int => Unit = {
+ val arrayClass = Array.empty[T].iterator.getClass
+ val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass
+ data.getClass match {
+ case `arrayClass` => ((n: Int) => { data = data.drop(n) })
+ case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) })
+ case _ => ((n: Int) => {
+ var j = 0
+ while (j < n && data.hasNext) {
+ data.next()
+ j += 1
+ }
+ })
+ }
+ }
+
+ override def hasNext: Boolean = data.hasNext
+
+ override def next(): T = {
+ val r = data.next()
+ advance
+ r
+ }
+
+ private val lnq = math.log1p(-f)
+
+ /** skip elements that won't be sampled, according to geometric dist P(k) = (f)(1-f)^k. */
+ private def advance: Unit = {
+ val u = math.max(rng.nextDouble(), epsilon)
+ val k = (math.log(u) / lnq).toInt
+ iterDrop(k)
+ }
+
+ /** advance to first sample as part of object construction. */
+ advance
+ // Attempting to invoke this closer to the top with other object initialization
+ // was causing it to break in strange ways, so I'm invoking it last, which seems to
+ // work reliably.
+}
+
+private[spark]
+class GapSamplingReplacementIterator[T: ClassTag](
+ var data: Iterator[T],
+ f: Double,
+ rng: Random = RandomSampler.newDefaultRNG,
+ epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] {
+
+ require(f > 0.0, s"Sampling fraction ($f) must be > 0")
+ require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
+
+ /** implement efficient linear-sequence drop until scala includes fix for jira SI-8835. */
+ private val iterDrop: Int => Unit = {
+ val arrayClass = Array.empty[T].iterator.getClass
+ val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass
+ data.getClass match {
+ case `arrayClass` => ((n: Int) => { data = data.drop(n) })
+ case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) })
+ case _ => ((n: Int) => {
+ var j = 0
+ while (j < n && data.hasNext) {
+ data.next()
+ j += 1
+ }
+ })
+ }
+ }
+
+ /** current sampling value, and its replication factor, as we are sampling with replacement. */
+ private var v: T = _
+ private var rep: Int = 0
+
+ override def hasNext: Boolean = data.hasNext || rep > 0
+
+ override def next(): T = {
+ val r = v
+ rep -= 1
+ if (rep <= 0) advance
+ r
+ }
+
+ /**
+ * Skip elements with replication factor zero (i.e. elements that won't be sampled).
+ * Samples 'k' from geometric distribution P(k) = (1-q)(q)^k, where q = e^(-f), that is
+ * q is the probabililty of Poisson(0; f)
+ */
+ private def advance: Unit = {
+ val u = math.max(rng.nextDouble(), epsilon)
+ val k = (math.log(u) / (-f)).toInt
+ iterDrop(k)
+ // set the value and replication factor for the next value
+ if (data.hasNext) {
+ v = data.next()
+ rep = poissonGE1
+ }
+ }
+
+ private val q = math.exp(-f)
+
+ /**
+ * Sample from Poisson distribution, conditioned such that the sampled value is >= 1.
+ * This is an adaptation from the algorithm for Generating Poisson distributed random variables:
+ * http://en.wikipedia.org/wiki/Poisson_distribution
+ */
+ private def poissonGE1: Int = {
+ // simulate that the standard poisson sampling
+ // gave us at least one iteration, for a sample of >= 1
+ var pp = q + ((1.0 - q) * rng.nextDouble())
+ var r = 1
+
+ // now continue with standard poisson sampling algorithm
+ pp *= rng.nextDouble()
+ while (pp > q) {
+ r += 1
+ pp *= rng.nextDouble()
}
+ r
}
- override def clone = new PoissonSampler[T](mean)
+ /** advance to first sample as part of object construction. */
+ advance
+ // Attempting to invoke this closer to the top with other object initialization
+ // was causing it to break in strange ways, so I'm invoking it last, which seems to
+ // work reliably.
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 0172876a26..c21a4b30d7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -140,11 +140,10 @@ public class JavaAPISuite implements Serializable {
public void sample() {
List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> rdd = sc.parallelize(ints);
- JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 11);
- // expected 2 but of course result varies randomly a bit
- Assert.assertEquals(1, sample20.count());
- JavaRDD<Integer> sample20NoReplacement = rdd.sample(false, 0.2, 11);
- Assert.assertEquals(2, sample20NoReplacement.count());
+ JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 3);
+ Assert.assertEquals(2, sample20.count());
+ JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 5);
+ Assert.assertEquals(2, sample20WithoutReplacement.count());
}
@Test
diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
index ba67d766a7..20944b6247 100644
--- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
@@ -18,97 +18,523 @@
package org.apache.spark.util.random
import java.util.Random
-
+import scala.collection.mutable.ArrayBuffer
import org.apache.commons.math3.distribution.PoissonDistribution
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.mock.EasyMockSugar
-
-class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
-
- val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
-
- var random: Random = _
- var poisson: PoissonDistribution = _
-
- before {
- random = mock[Random]
- poisson = mock[PoissonDistribution]
- }
-
- test("BernoulliSamplerWithRange") {
- expecting {
- for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
- random.nextDouble().andReturn(x)
- }
- }
- whenExecuting(random) {
- val sampler = new BernoulliSampler[Int](0.25, 0.55)
- sampler.rng = random
- assert(sampler.sample(a.iterator).toList == List(3, 4, 5))
- }
- }
-
- test("BernoulliSamplerWithRangeInverse") {
- expecting {
- for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
- random.nextDouble().andReturn(x)
- }
- }
- whenExecuting(random) {
- val sampler = new BernoulliSampler[Int](0.25, 0.55, true)
- sampler.rng = random
- assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9))
- }
- }
-
- test("BernoulliSamplerWithRatio") {
- expecting {
- for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
- random.nextDouble().andReturn(x)
- }
- }
- whenExecuting(random) {
- val sampler = new BernoulliSampler[Int](0.35)
- sampler.rng = random
- assert(sampler.sample(a.iterator).toList == List(1, 2, 3))
- }
- }
-
- test("BernoulliSamplerWithComplement") {
- expecting {
- for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
- random.nextDouble().andReturn(x)
- }
- }
- whenExecuting(random) {
- val sampler = new BernoulliSampler[Int](0.25, 0.55, true)
- sampler.rng = random
- assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9))
- }
- }
-
- test("BernoulliSamplerSetSeed") {
- expecting {
- random.setSeed(10L)
- }
- whenExecuting(random) {
- val sampler = new BernoulliSampler[Int](0.2)
- sampler.rng = random
- sampler.setSeed(10L)
- }
- }
-
- test("PoissonSampler") {
- expecting {
- for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) {
- poisson.sample().andReturn(x)
- }
- }
- whenExecuting(poisson) {
- val sampler = new PoissonSampler[Int](0.2)
- sampler.rng = poisson
- assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6))
- }
+import org.scalatest.{FunSuite, Matchers}
+
+class RandomSamplerSuite extends FunSuite with Matchers {
+ /**
+ * My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test
+ * between the random samplers and simple reference samplers (known to work correctly).
+ * The sampling gap sizes between chosen samples should show up as having the same
+ * distributions between test and reference, if things are working properly. That is,
+ * the KS test will fail to strongly reject the null hypothesis that the distributions of
+ * sampling gaps are the same.
+ * There are no actual KS tests implemented for scala (that I can find) - and so what I
+ * have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak"
+ * p-value for a particular sample size. I can then test that my measured KS stats
+ * are less than D. Computing D-values is easy, and implemented below.
+ *
+ * I used the scipy 'kstwobign' distribution to pre-compute my D value:
+ *
+ * def ksdval(q=0.1, n=1000):
+ * en = np.sqrt(float(n) / 2.0)
+ * return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en)
+ *
+ * When comparing KS stats I take the median of a small number of independent test runs
+ * to compensate for the issue that any sampled statistic will show "false positive" with
+ * some probability. Even when two distributions are the same, they will register as
+ * different 10% of the time at a p-value of 0.1
+ */
+
+ // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000:
+ val sampleSize = 1000
+ val D = 0.0544280747619
+
+ // I'm not a big fan of fixing seeds, but unit testing based on running statistical tests
+ // will always fail with some nonzero probability, so I'll fix the seed to prevent these
+ // tests from generating random failure noise in CI testing, etc.
+ val rngSeed: Random = RandomSampler.newDefaultRNG
+ rngSeed.setSeed(235711)
+
+ // Reference implementation of sampling without replacement (bernoulli)
+ def sample[T](data: Iterator[T], f: Double): Iterator[T] = {
+ val rng: Random = RandomSampler.newDefaultRNG
+ rng.setSeed(rngSeed.nextLong)
+ data.filter(_ => (rng.nextDouble <= f))
+ }
+
+ // Reference implementation of sampling with replacement
+ def sampleWR[T](data: Iterator[T], f: Double): Iterator[T] = {
+ val rng = new PoissonDistribution(f)
+ rng.reseedRandomGenerator(rngSeed.nextLong)
+ data.flatMap { v => {
+ val rep = rng.sample()
+ if (rep == 0) Iterator.empty else Iterator.fill(rep)(v)
+ }}
+ }
+
+ // Returns iterator over gap lengths between samples.
+ // This function assumes input data is integers sampled from the sequence of
+ // increasing integers: {0, 1, 2, ...}. This works because that is how I generate them,
+ // and the samplers preserve their input order
+ def gaps(data: Iterator[Int]): Iterator[Int] = {
+ data.sliding(2).withPartial(false).map { x => x(1) - x(0) }
+ }
+
+ // Returns the cumulative distribution from a histogram
+ def cumulativeDist(hist: Array[Int]): Array[Double] = {
+ val n = hist.sum.toDouble
+ assert(n > 0.0)
+ hist.scanLeft(0)(_ + _).drop(1).map { _.toDouble / n }
+ }
+
+ // Returns aligned cumulative distributions from two arrays of data
+ def cumulants(d1: Array[Int], d2: Array[Int],
+ ss: Int = sampleSize): (Array[Double], Array[Double]) = {
+ assert(math.min(d1.length, d2.length) > 0)
+ assert(math.min(d1.min, d2.min) >= 0)
+ val m = 1 + math.max(d1.max, d2.max)
+ val h1 = Array.fill[Int](m)(0)
+ val h2 = Array.fill[Int](m)(0)
+ for (v <- d1) { h1(v) += 1 }
+ for (v <- d2) { h2(v) += 1 }
+ assert(h1.sum == h2.sum)
+ assert(h1.sum == ss)
+ (cumulativeDist(h1), cumulativeDist(h2))
+ }
+
+ // Computes the Kolmogorov-Smirnov 'D' statistic from two cumulative distributions
+ def KSD(cdf1: Array[Double], cdf2: Array[Double]): Double = {
+ assert(cdf1.length == cdf2.length)
+ val n = cdf1.length
+ assert(n > 0)
+ assert(cdf1(n-1) == 1.0)
+ assert(cdf2(n-1) == 1.0)
+ cdf1.zip(cdf2).map { x => Math.abs(x._1 - x._2) }.max
+ }
+
+ // Returns the median KS 'D' statistic between two samples, over (m) sampling trials
+ def medianKSD(data1: => Iterator[Int], data2: => Iterator[Int], m: Int = 5): Double = {
+ val t = Array.fill[Double](m) {
+ val (c1, c2) = cumulants(data1.take(sampleSize).toArray,
+ data2.take(sampleSize).toArray)
+ KSD(c1, c2)
+ }.sorted
+ // return the median KS statistic
+ t(m / 2)
+ }
+
+ test("utilities") {
+ val s1 = Array(0, 1, 1, 0, 2)
+ val s2 = Array(1, 0, 3, 2, 1)
+ val (c1, c2) = cumulants(s1, s2, ss = 5)
+ c1 should be (Array(0.4, 0.8, 1.0, 1.0))
+ c2 should be (Array(0.2, 0.6, 0.8, 1.0))
+ KSD(c1, c2) should be (0.2 +- 0.000001)
+ KSD(c2, c1) should be (KSD(c1, c2))
+ gaps(List(0, 1, 1, 2, 4, 11).iterator).toArray should be (Array(1, 0, 1, 2, 7))
+ }
+
+ test("sanity check medianKSD against references") {
+ var d: Double = 0.0
+
+ // should be statistically same, i.e. fail to reject null hypothesis strongly
+ d = medianKSD(gaps(sample(Iterator.from(0), 0.5)), gaps(sample(Iterator.from(0), 0.5)))
+ d should be < D
+
+ // should be statistically different - null hypothesis will have high D value,
+ // corresponding to low p-value that rejects the null hypothesis
+ d = medianKSD(gaps(sample(Iterator.from(0), 0.4)), gaps(sample(Iterator.from(0), 0.5)))
+ d should be > D
+
+ // same!
+ d = medianKSD(gaps(sampleWR(Iterator.from(0), 0.5)), gaps(sampleWR(Iterator.from(0), 0.5)))
+ d should be < D
+
+ // different!
+ d = medianKSD(gaps(sampleWR(Iterator.from(0), 0.5)), gaps(sampleWR(Iterator.from(0), 0.6)))
+ d should be > D
+ }
+
+ test("bernoulli sampling") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.5)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.7)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.7)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.9)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new BernoulliSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.6)))
+ d should be > D
+ }
+
+ test("bernoulli sampling with gap sampling optimization") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.01)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.01)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.1)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.3)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new BernoulliSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.4)))
+ d should be > D
+ }
+
+ test("bernoulli boundary cases") {
+ val data = (1 to 100).toArray
+
+ var sampler = new BernoulliSampler[Int](0.0)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliSampler[Int](1.0)
+ sampler.sample(data.iterator).toArray should be (data)
+
+ sampler = new BernoulliSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0))
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliSampler[Int](1.0 + (RandomSampler.roundingEpsilon / 2.0))
+ sampler.sample(data.iterator).toArray should be (data)
+ }
+
+ test("bernoulli data types") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+ var sampler = new BernoulliSampler[Int](0.1)
+ sampler.setSeed(rngSeed.nextLong)
+
+ // Array iterator (indexable type)
+ d = medianKSD(
+ gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toArray.iterator)),
+ gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ // ArrayBuffer iterator (indexable type)
+ d = medianKSD(
+ gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
+ gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ // List iterator (non-indexable type)
+ d = medianKSD(
+ gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toList.iterator)),
+ gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+ }
+
+ test("bernoulli clone") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d = 0.0
+ var sampler = new BernoulliSampler[Int](0.1).clone
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.9).clone
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9)))
+ d should be < D
+ }
+
+ test("bernoulli set seed") {
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+ var sampler1 = new BernoulliSampler[Int](0.2)
+ var sampler2 = new BernoulliSampler[Int](0.2)
+
+ // distributions should be identical if seeds are set same
+ sampler1.setSeed(73)
+ sampler2.setSeed(73)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be (0.0)
+
+ // should be different for different seeds
+ sampler1.setSeed(73)
+ sampler2.setSeed(37)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be > 0.0
+ d should be < D
+
+ sampler1 = new BernoulliSampler[Int](0.8)
+ sampler2 = new BernoulliSampler[Int](0.8)
+
+ // distributions should be identical if seeds are set same
+ sampler1.setSeed(73)
+ sampler2.setSeed(73)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be (0.0)
+
+ // should be different for different seeds
+ sampler1.setSeed(73)
+ sampler2.setSeed(37)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be > 0.0
+ d should be < D
+ }
+
+ test("replacement sampling") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ var sampler = new PoissonSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.5)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.7)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.7)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.9)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.9)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new PoissonSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.6)))
+ d should be > D
+ }
+
+ test("replacement sampling with gap sampling") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ var sampler = new PoissonSampler[Int](0.01)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.01)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.1)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.3)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new PoissonSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.4)))
+ d should be > D
+ }
+
+ test("replacement boundary cases") {
+ val data = (1 to 100).toArray
+
+ var sampler = new PoissonSampler[Int](0.0)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new PoissonSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0))
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ // sampling with replacement has no upper bound on sampling fraction
+ sampler = new PoissonSampler[Int](2.0)
+ sampler.sample(data.iterator).length should be > (data.length)
+ }
+
+ test("replacement data types") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+ var sampler = new PoissonSampler[Int](0.1)
+ sampler.setSeed(rngSeed.nextLong)
+
+ // Array iterator (indexable type)
+ d = medianKSD(
+ gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toArray.iterator)),
+ gaps(sampleWR(Iterator.from(0), 0.1)))
+ d should be < D
+
+ // ArrayBuffer iterator (indexable type)
+ d = medianKSD(
+ gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
+ gaps(sampleWR(Iterator.from(0), 0.1)))
+ d should be < D
+
+ // List iterator (non-indexable type)
+ d = medianKSD(
+ gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toList.iterator)),
+ gaps(sampleWR(Iterator.from(0), 0.1)))
+ d should be < D
+ }
+
+ test("replacement clone") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d = 0.0
+ var sampler = new PoissonSampler[Int](0.1).clone
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.9).clone
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.9)))
+ d should be < D
+ }
+
+ test("replacement set seed") {
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+ var sampler1 = new PoissonSampler[Int](0.2)
+ var sampler2 = new PoissonSampler[Int](0.2)
+
+ // distributions should be identical if seeds are set same
+ sampler1.setSeed(73)
+ sampler2.setSeed(73)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be (0.0)
+
+ // should be different for different seeds
+ sampler1.setSeed(73)
+ sampler2.setSeed(37)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be > 0.0
+ d should be < D
+
+ sampler1 = new PoissonSampler[Int](0.8)
+ sampler2 = new PoissonSampler[Int](0.8)
+
+ // distributions should be identical if seeds are set same
+ sampler1.setSeed(73)
+ sampler2.setSeed(73)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be (0.0)
+
+ // should be different for different seeds
+ sampler1.setSeed(73)
+ sampler2.setSeed(37)
+ d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+ d should be > 0.0
+ d should be < D
+ }
+
+ test("bernoulli partitioning sampling") {
+ var d: Double = 0.0
+
+ var sampler = new BernoulliCellSampler[Int](0.1, 0.2)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new BernoulliCellSampler[Int](0.1, 0.2, true)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9)))
+ d should be < D
+ }
+
+ test("bernoulli partitioning boundary cases") {
+ val data = (1 to 100).toArray
+ val d = RandomSampler.roundingEpsilon / 2.0
+
+ var sampler = new BernoulliCellSampler[Int](0.0, 0.0)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](0.5, 0.5)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](1.0, 1.0)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](0.0, 1.0)
+ sampler.sample(data.iterator).toArray should be (data)
+
+ sampler = new BernoulliCellSampler[Int](0.0 - d, 1.0 + d)
+ sampler.sample(data.iterator).toArray should be (data)
+
+ sampler = new BernoulliCellSampler[Int](0.5, 0.5 - d)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+ }
+
+ test("bernoulli partitioning data") {
+ val seed = rngSeed.nextLong
+ val data = (1 to 100).toArray
+
+ var sampler = new BernoulliCellSampler[Int](0.4, 0.6)
+ sampler.setSeed(seed)
+ val s1 = sampler.sample(data.iterator).toArray
+ s1.length should be > 0
+
+ sampler = new BernoulliCellSampler[Int](0.4, 0.6, true)
+ sampler.setSeed(seed)
+ val s2 = sampler.sample(data.iterator).toArray
+ s2.length should be > 0
+
+ (s1 ++ s2).sorted should be (data)
+
+ sampler = new BernoulliCellSampler[Int](0.5, 0.5)
+ sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](0.5, 0.5, true)
+ sampler.sample(data.iterator).toArray should be (data)
+ }
+
+ test("bernoulli partitioning clone") {
+ val seed = rngSeed.nextLong
+ val data = (1 to 100).toArray
+ val base = new BernoulliCellSampler[Int](0.35, 0.65)
+
+ var sampler = base.clone
+ sampler.setSeed(seed)
+ val s1 = sampler.sample(data.iterator).toArray
+ s1.length should be > 0
+
+ sampler = base.cloneComplement
+ sampler.setSeed(seed)
+ val s2 = sampler.sample(data.iterator).toArray
+ s2.length should be > 0
+
+ (s1 ++ s2).sorted should be (data)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index b88e08bf14..9353351af7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
-import org.apache.spark.util.random.BernoulliSampler
+import org.apache.spark.util.random.BernoulliCellSampler
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel
@@ -244,7 +244,7 @@ object MLUtils {
def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = {
val numFoldsF = numFolds.toFloat
(1 to numFolds).map { fold =>
- val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF,
+ val sampler = new BernoulliCellSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF,
complement = false)
val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed)
val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed)