diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala index 44295e5a1a..5fe3358316 100644 --- a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala @@ -29,8 +29,9 @@ import org.apache.spark.util.StatCounter private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[StatCounter, BoundedDouble] { + // modified in merge var outputsMerged = 0 - var counter = new StatCounter + val counter = new StatCounter override def merge(outputId: Int, taskResult: StatCounter) { outputsMerged += 1 @@ -40,30 +41,39 @@ private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) override def currentResult(): BoundedDouble = { if (outputsMerged == totalOutputs) { new BoundedDouble(counter.sum, 1.0, counter.sum, counter.sum) - } else if (outputsMerged == 0) { + } else if (outputsMerged == 0 || counter.count == 0) { new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity) } else { val p = outputsMerged.toDouble / totalOutputs val meanEstimate = counter.mean - val meanVar = counter.sampleVariance / counter.count val countEstimate = (counter.count + 1 - p) / p - val countVar = (counter.count + 1) * (1 - p) / (p * p) val sumEstimate = meanEstimate * countEstimate - val sumVar = (meanEstimate * meanEstimate * countVar) + - (countEstimate * countEstimate * meanVar) + - (meanVar * countVar) - val sumStdev = math.sqrt(sumVar) - val confFactor = { - if (counter.count > 100) { + + val meanVar = counter.sampleVariance / counter.count + + // branch at this point because counter.count == 1 implies counter.sampleVariance == Nan + // and we don't want to ever return a bound of NaN + if (meanVar.isNaN || counter.count == 1) { + new BoundedDouble(sumEstimate, confidence, Double.NegativeInfinity, Double.PositiveInfinity) + } else { + val countVar = (counter.count + 1) * (1 - p) / (p * p) + val sumVar = (meanEstimate * meanEstimate * countVar) + + (countEstimate * countEstimate * meanVar) + + (meanVar * countVar) + val sumStdev = math.sqrt(sumVar) + val confFactor = if (counter.count > 100) { new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) } else { + // note that if this goes to 0, TDistribution will throw an exception. + // Hence special casing 1 above. val degreesOfFreedom = (counter.count - 1).toInt new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2) } + + val low = sumEstimate - confFactor * sumStdev + val high = sumEstimate + confFactor * sumStdev + new BoundedDouble(sumEstimate, confidence, low, high) } - val low = sumEstimate - confFactor * sumStdev - val high = sumEstimate + confFactor * sumStdev - new BoundedDouble(sumEstimate, confidence, low, high) } } } |