aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-09 10:58:36 -0700
committerReynold Xin <rxin@databricks.com>2015-08-09 10:58:36 -0700
commite9c36938ba972b6fe3c9f6228508e3c9f1c876b2 (patch)
tree20ecaccaa7f3e1e1cd97b6246afffe2cc6d37abc /core
parent3ca995b78f373251081f6877623649bfba3040b2 (diff)
downloadspark-e9c36938ba972b6fe3c9f6228508e3c9f1c876b2.tar.gz
spark-e9c36938ba972b6fe3c9f6228508e3c9f1c876b2.tar.bz2
spark-e9c36938ba972b6fe3c9f6228508e3c9f1c876b2.zip
[SPARK-9752][SQL] Support UnsafeRow in Sample operator.
In order for this to work, I had to disable gap sampling. Author: Reynold Xin <rxin@databricks.com> Closes #8040 from rxin/SPARK-9752 and squashes the following commits: f9e248c [Reynold Xin] Fix the test case for real this time. adbccb3 [Reynold Xin] Fixed test case. 589fb23 [Reynold Xin] Merge branch 'SPARK-9752' of github.com:rxin/spark into SPARK-9752 55ccddc [Reynold Xin] Fixed core test. 78fa895 [Reynold Xin] [SPARK-9752][SQL] Support UnsafeRow in Sample operator. c9e7112 [Reynold Xin] [SPARK-9752][SQL] Support UnsafeRow in Sample operator.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala18
1 files changed, 12 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
index 786b97ad7b..c156b03cdb 100644
--- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
@@ -176,10 +176,15 @@ class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T
* A sampler for sampling with replacement, based on values drawn from Poisson distribution.
*
* @param fraction the sampling fraction (with replacement)
+ * @param useGapSamplingIfPossible if true, use gap sampling when sampling ratio is low.
* @tparam T item type
*/
@DeveloperApi
-class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](
+ fraction: Double,
+ useGapSamplingIfPossible: Boolean) extends RandomSampler[T, T] {
+
+ def this(fraction: Double) = this(fraction, useGapSamplingIfPossible = true)
/** Epsilon slop to avoid failure from floating point jitter. */
require(
@@ -199,17 +204,18 @@ class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T]
override def sample(items: Iterator[T]): Iterator[T] = {
if (fraction <= 0.0) {
Iterator.empty
- } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
- new GapSamplingReplacementIterator(items, fraction, rngGap, RandomSampler.rngEpsilon)
+ } else if (useGapSamplingIfPossible &&
+ fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
+ new GapSamplingReplacementIterator(items, fraction, rngGap, RandomSampler.rngEpsilon)
} else {
- items.flatMap { item => {
+ items.flatMap { item =>
val count = rng.sample()
if (count == 0) Iterator.empty else Iterator.fill(count)(item)
- }}
+ }
}
}
- override def clone: PoissonSampler[T] = new PoissonSampler[T](fraction)
+ override def clone: PoissonSampler[T] = new PoissonSampler[T](fraction, useGapSamplingIfPossible)
}