aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-17 15:26:36 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-17 15:26:36 -0700
commitdb42451a52ed5b0f228c87ddeb07c118e9d56ef6 (patch)
tree27a49c91e3c61dd7eacc2c903513e906be64d6c0
parente82a2ffcc93fb72ac04b29d1ed13f26f6ac52f6c (diff)
parentf91195cc150a3ead122046d14bd35b4fcf28c9cb (diff)
downloadspark-db42451a52ed5b0f228c87ddeb07c118e9d56ef6.tar.gz
spark-db42451a52ed5b0f228c87ddeb07c118e9d56ef6.tar.bz2
spark-db42451a52ed5b0f228c87ddeb07c118e9d56ef6.zip
Merge pull request #643 from adatao/master
Bug fix: Zero-length partitions result in NaN for overall mean & variance
-rw-r--r--core/src/main/scala/spark/util/StatCounter.scala26
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala21
-rw-r--r--project/plugins.sbt2
3 files changed, 37 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
index 5f80180339..2b980340b7 100644
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ b/core/src/main/scala/spark/util/StatCounter.scala
@@ -37,17 +37,23 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
if (other == this) {
merge(other.copy()) // Avoid overwriting fields in a weird order
} else {
- val delta = other.mu - mu
- if (other.n * 10 < n) {
- mu = mu + (delta * other.n) / (n + other.n)
- } else if (n * 10 < other.n) {
- mu = other.mu - (delta * n) / (n + other.n)
- } else {
- mu = (mu * n + other.mu * other.n) / (n + other.n)
+ if (n == 0) {
+ mu = other.mu
+ m2 = other.m2
+ n = other.n
+ } else if (other.n != 0) {
+ val delta = other.mu - mu
+ if (other.n * 10 < n) {
+ mu = mu + (delta * other.n) / (n + other.n)
+ } else if (n * 10 < other.n) {
+ mu = other.mu - (delta * n) / (n + other.n)
+ } else {
+ mu = (mu * n + other.mu * other.n) / (n + other.n)
+ }
+ m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
+ n += other.n
}
- m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
- n += other.n
- this
+ this
}
}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 60db759c25..16f93e71a3 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -1,10 +1,10 @@
package spark
import org.scalatest.FunSuite
-
import scala.collection.mutable.ArrayBuffer
-
import SparkContext._
+import spark.util.StatCounter
+import scala.math.abs
class PartitioningSuite extends FunSuite with LocalSparkContext {
@@ -120,4 +120,21 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
}
+
+ test("Zero-length partitions should be correctly handled") {
+ // Create RDD with some consecutive empty partitions (including the "first" one)
+ sc = new SparkContext("local", "test")
+ val rdd: RDD[Double] = sc
+ .parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
+ .filter(_ >= 0.0)
+
+ // Run the partitions, including the consecutive empty ones, through StatCounter
+ val stats: StatCounter = rdd.stats();
+ assert(abs(6.0 - stats.sum) < 0.01);
+ assert(abs(6.0/2 - rdd.mean) < 0.01);
+ assert(abs(1.0 - rdd.variance) < 0.01);
+ assert(abs(1.0 - rdd.stdev) < 0.01);
+
+ // Add other tests here for classes that should be able to handle empty partitions correctly
+ }
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index d4f2442872..25b812a28d 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -16,3 +16,5 @@ addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1")
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
//addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6")
+
+libraryDependencies += "com.novocode" % "junit-interface" % "0.10-M4" % "test"