aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-03-28 09:58:47 -0700
committerDavies Liu <davies.liu@gmail.com>2016-03-28 09:58:47 -0700
commit68c0c460bfc51d7f69d09b613c49c212dd0b375c (patch)
tree3c1ee9e3d56e7c8e41c54b70a6bf8472c0078d54 /core/src/test/scala/org/apache
parentc8388297c436691a236520d2396deaf556aedb0e (diff)
downloadspark-68c0c460bfc51d7f69d09b613c49c212dd0b375c.tar.gz
spark-68c0c460bfc51d7f69d09b613c49c212dd0b375c.tar.bz2
spark-68c0c460bfc51d7f69d09b613c49c212dd0b375c.zip
[SPARK-13742] [CORE] Add non-iterator interface to RandomSampler
JIRA: https://issues.apache.org/jira/browse/SPARK-13742 ## What changes were proposed in this pull request? `RandomSampler.sample` currently accepts iterator as input and output another iterator. This makes it inappropriate to use in wholestage codegen of `Sampler` operator #11517. This change is to add non-iterator interface to `RandomSampler`. This change adds a new method `def sample(): Int` to the trait `RandomSampler`. As we don't need to know the actual values of the sampling items, so this new method takes no arguments. This method will decide whether to sample the next item or not. It returns how many times the next item will be sampled. For `BernoulliSampler` and `BernoulliCellSampler`, the returned sampling times can only be 0 or 1. It simply means whether to sample the next item or not. For `PoissonSampler`, the returned value can be more than 1, meaning the next item will be sampled multiple times. ## How was this patch tested? Tests are added into `RandomSamplerSuite`. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Liang-Chi Hsieh <viirya@appier.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #11578 from viirya/random-sampler-no-iterator.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala197
2 files changed, 199 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
index 132a5fa9a8..cb0de1c6be 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
@@ -29,6 +29,8 @@ class MockSampler extends RandomSampler[Long, Long] {
s = seed
}
+ override def sample(): Int = 1
+
override def sample(items: Iterator[Long]): Iterator[Long] = {
Iterator(s)
}
diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
index 791491daf0..7eb2f56c20 100644
--- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
@@ -129,6 +129,13 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
t(m / 2)
}
+ def replacementSampling(data: Iterator[Int], sampler: PoissonSampler[Int]): Iterator[Int] = {
+ data.flatMap { item =>
+ val count = sampler.sample()
+ if (count == 0) Iterator.empty else Iterator.fill(count)(item)
+ }
+ }
+
test("utilities") {
val s1 = Array(0, 1, 1, 0, 2)
val s2 = Array(1, 0, 3, 2, 1)
@@ -189,6 +196,36 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
d should be > D
}
+ test("bernoulli sampling without iterator") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ val data = Iterator.from(0)
+
+ var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.5)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.7)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.7)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.9)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.9)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new BernoulliSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.6)))
+ d should be > D
+ }
+
test("bernoulli sampling with gap sampling optimization") {
// Tests expect maximum gap sampling fraction to be this value
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
@@ -217,6 +254,37 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
d should be > D
}
+ test("bernoulli sampling (without iterator) with gap sampling optimization") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ val data = Iterator.from(0)
+
+ var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.01)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)),
+ gaps(sample(Iterator.from(0), 0.01)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.1)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new BernoulliSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.3)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new BernoulliSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.4)))
+ d should be > D
+ }
+
test("bernoulli boundary cases") {
val data = (1 to 100).toArray
@@ -233,6 +301,22 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
sampler.sample(data.iterator).toArray should be (data)
}
+ test("bernoulli (without iterator) boundary cases") {
+ val data = (1 to 100).toArray
+
+ var sampler = new BernoulliSampler[Int](0.0)
+ data.filter(_ => sampler.sample() > 0) should be (Array.empty[Int])
+
+ sampler = new BernoulliSampler[Int](1.0)
+ data.filter(_ => sampler.sample() > 0) should be (data)
+
+ sampler = new BernoulliSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0))
+ data.filter(_ => sampler.sample() > 0) should be (Array.empty[Int])
+
+ sampler = new BernoulliSampler[Int](1.0 + (RandomSampler.roundingEpsilon / 2.0))
+ data.filter(_ => sampler.sample() > 0) should be (data)
+ }
+
test("bernoulli data types") {
// Tests expect maximum gap sampling fraction to be this value
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
@@ -341,6 +425,36 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
d should be > D
}
+ test("replacement sampling without iterator") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ val data = Iterator.from(0)
+
+ var sampler = new PoissonSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.5)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.7)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.7)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.9)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.9)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new PoissonSampler[Int](0.5)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.6)))
+ d should be > D
+ }
+
test("replacement sampling with gap sampling") {
// Tests expect maximum gap sampling fraction to be this value
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
@@ -369,6 +483,36 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
d should be > D
}
+ test("replacement sampling (without iterator) with gap sampling") {
+ // Tests expect maximum gap sampling fraction to be this value
+ RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+ var d: Double = 0.0
+
+ val data = Iterator.from(0)
+
+ var sampler = new PoissonSampler[Int](0.01)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.01)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.1)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new PoissonSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.3)))
+ d should be < D
+
+ // sampling at different frequencies should show up as statistically different:
+ sampler = new PoissonSampler[Int](0.3)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(replacementSampling(data, sampler)), gaps(sampleWR(Iterator.from(0), 0.4)))
+ d should be > D
+ }
+
test("replacement boundary cases") {
val data = (1 to 100).toArray
@@ -383,6 +527,20 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
sampler.sample(data.iterator).length should be > (data.length)
}
+ test("replacement (without) boundary cases") {
+ val data = (1 to 100).toArray
+
+ var sampler = new PoissonSampler[Int](0.0)
+ replacementSampling(data.iterator, sampler).toArray should be (Array.empty[Int])
+
+ sampler = new PoissonSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0))
+ replacementSampling(data.iterator, sampler).toArray should be (Array.empty[Int])
+
+ // sampling with replacement has no upper bound on sampling fraction
+ sampler = new PoissonSampler[Int](2.0)
+ replacementSampling(data.iterator, sampler).length should be > (data.length)
+ }
+
test("replacement data types") {
// Tests expect maximum gap sampling fraction to be this value
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
@@ -477,6 +635,22 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
d should be < D
}
+ test("bernoulli partitioning sampling without iterator") {
+ var d: Double = 0.0
+
+ val data = Iterator.from(0)
+
+ var sampler = new BernoulliCellSampler[Int](0.1, 0.2)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.1)))
+ d should be < D
+
+ sampler = new BernoulliCellSampler[Int](0.1, 0.2, true)
+ sampler.setSeed(rngSeed.nextLong)
+ d = medianKSD(gaps(data.filter(_ => sampler.sample() > 0)), gaps(sample(Iterator.from(0), 0.9)))
+ d should be < D
+ }
+
test("bernoulli partitioning boundary cases") {
val data = (1 to 100).toArray
val d = RandomSampler.roundingEpsilon / 2.0
@@ -500,6 +674,29 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
sampler.sample(data.iterator).toArray should be (Array.empty[Int])
}
+ test("bernoulli partitioning (without iterator) boundary cases") {
+ val data = (1 to 100).toArray
+ val d = RandomSampler.roundingEpsilon / 2.0
+
+ var sampler = new BernoulliCellSampler[Int](0.0, 0.0)
+ data.filter(_ => sampler.sample() > 0).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](0.5, 0.5)
+ data.filter(_ => sampler.sample() > 0).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](1.0, 1.0)
+ data.filter(_ => sampler.sample() > 0).toArray should be (Array.empty[Int])
+
+ sampler = new BernoulliCellSampler[Int](0.0, 1.0)
+ data.filter(_ => sampler.sample() > 0).toArray should be (data)
+
+ sampler = new BernoulliCellSampler[Int](0.0 - d, 1.0 + d)
+ data.filter(_ => sampler.sample() > 0).toArray should be (data)
+
+ sampler = new BernoulliCellSampler[Int](0.5, 0.5 - d)
+ data.filter(_ => sampler.sample() > 0).toArray should be (Array.empty[Int])
+ }
+
test("bernoulli partitioning data") {
val seed = rngSeed.nextLong
val data = (1 to 100).toArray