diff options
author | Marcin Tustin <mtustin@handybook.com> | 2016-04-03 17:42:33 -0700 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-04-03 17:42:33 -0700 |
commit | 9023015f059327b3ce4a7eaf71e57ac77b84ad7b (patch) | |
tree | 6f1b7d71a36f8acc573a9e8cce31ddd05efa50fb /core/src/main/scala/org/apache | |
parent | c238cd07448f94bbceb661daad90b6a6d597a846 (diff) | |
download | spark-9023015f059327b3ce4a7eaf71e57ac77b84ad7b.tar.gz spark-9023015f059327b3ce4a7eaf71e57ac77b84ad7b.tar.bz2 spark-9023015f059327b3ce4a7eaf71e57ac77b84ad7b.zip |
[SPARK-14163][CORE] SumEvaluator and countApprox cannot reliably handle RDDs of size 1
## What changes were proposed in this pull request?
This special cases 0 and 1 counts to avoid passing 0 degrees of freedom.
## How was this patch tested?
Tests run successfully. New test added.
## Note:
This recreates #11982 which was closed to due to non-updated diff. rxin srowen Commented there.
This also adds tests, reworks the code to perform the special casing (based on srowen's comments), and adds equality machinery for BoundedDouble, as well as changing how it is transformed to string.
Author: Marcin Tustin <mtustin@handybook.com>
Author: Marcin Tustin <mtustin@handy.com>
Closes #12016 from mtustin-handy/SPARK-14163.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r-- | core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala | 18 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala | 36 |
2 files changed, 41 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala index 48b9434153..d06b2c67d2 100644 --- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala @@ -21,5 +21,23 @@ package org.apache.spark.partial * A Double value with error bars and associated confidence. */ class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { + override def toString(): String = "[%.3f, %.3f]".format(low, high) + + override def hashCode: Int = + this.mean.hashCode ^ this.confidence.hashCode ^ this.low.hashCode ^ this.high.hashCode + + /** + * Note that consistent with Double, any NaN value will make equality false + */ + override def equals(that: Any): Boolean = + that match { + case that: BoundedDouble => { + this.mean == that.mean && + this.confidence == that.confidence && + this.low == that.low && + this.high == that.high + } + case _ => false + } } diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala index 44295e5a1a..5fe3358316 100644 --- a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala @@ -29,8 +29,9 @@ import org.apache.spark.util.StatCounter private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[StatCounter, BoundedDouble] { + // modified in merge var outputsMerged = 0 - var counter = new StatCounter + val counter = new StatCounter override def merge(outputId: Int, taskResult: StatCounter) { outputsMerged += 1 @@ -40,30 +41,39 @@ private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) override def currentResult(): BoundedDouble = { if (outputsMerged == totalOutputs) { new BoundedDouble(counter.sum, 1.0, counter.sum, counter.sum) - } else if (outputsMerged == 0) { + } else if (outputsMerged == 0 || counter.count == 0) { new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity) } else { val p = outputsMerged.toDouble / totalOutputs val meanEstimate = counter.mean - val meanVar = counter.sampleVariance / counter.count val countEstimate = (counter.count + 1 - p) / p - val countVar = (counter.count + 1) * (1 - p) / (p * p) val sumEstimate = meanEstimate * countEstimate - val sumVar = (meanEstimate * meanEstimate * countVar) + - (countEstimate * countEstimate * meanVar) + - (meanVar * countVar) - val sumStdev = math.sqrt(sumVar) - val confFactor = { - if (counter.count > 100) { + + val meanVar = counter.sampleVariance / counter.count + + // branch at this point because counter.count == 1 implies counter.sampleVariance == Nan + // and we don't want to ever return a bound of NaN + if (meanVar.isNaN || counter.count == 1) { + new BoundedDouble(sumEstimate, confidence, Double.NegativeInfinity, Double.PositiveInfinity) + } else { + val countVar = (counter.count + 1) * (1 - p) / (p * p) + val sumVar = (meanEstimate * meanEstimate * countVar) + + (countEstimate * countEstimate * meanVar) + + (meanVar * countVar) + val sumStdev = math.sqrt(sumVar) + val confFactor = if (counter.count > 100) { new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) } else { + // note that if this goes to 0, TDistribution will throw an exception. + // Hence special casing 1 above. val degreesOfFreedom = (counter.count - 1).toInt new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2) } + + val low = sumEstimate - confFactor * sumStdev + val high = sumEstimate + confFactor * sumStdev + new BoundedDouble(sumEstimate, confidence, low, high) } - val low = sumEstimate - confFactor * sumStdev - val high = sumEstimate + confFactor * sumStdev - new BoundedDouble(sumEstimate, confidence, low, high) } } } |