aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorChandan Kumar <chandan.kumar@imaginea.com>2014-08-18 09:52:25 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-18 09:52:25 -0700
commitf45efbb8aaa65bc46d65e77e93076fbc29f4455d (patch)
tree6191f8f9130875f2b9b468cb376c66569f294b10 /core
parentc0cbbdeaf4f2033be03d32e3ea0288812b4edbf6 (diff)
downloadspark-f45efbb8aaa65bc46d65e77e93076fbc29f4455d.tar.gz
spark-f45efbb8aaa65bc46d65e77e93076fbc29f4455d.tar.bz2
spark-f45efbb8aaa65bc46d65e77e93076fbc29f4455d.zip
[SPARK-2862] histogram method fails on some choices of bucketCount
Author: Chandan Kumar <chandan.kumar@imaginea.com> Closes #1787 from nrchandan/spark-2862 and squashes the following commits: a76bbf6 [Chandan Kumar] [SPARK-2862] Fix for a broken test case and add new test cases 4211eea [Chandan Kumar] [SPARK-2862] Add Scala bug id 13854f1 [Chandan Kumar] [SPARK-2862] Use shorthand range notation to avoid Scala bug
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala23
2 files changed, 34 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index f233544d12..e0494ee396 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
- // Compute the minimum and the maxium
+ // Scala's built-in range has issues. See #SI-8782
+ def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
+ val span = max - min
+ Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
+ }
+ // Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
- val increment = (max-min)/bucketCount.toDouble
- val range = if (increment != 0) {
- Range.Double.inclusive(min, max, increment)
+ val range = if (min != max) {
+ // Range.Double.inclusive(min, max, increment)
+ // The above code doesn't always work. See Scala bug #SI-8782.
+ // https://issues.scala-lang.org/browse/SI-8782
+ customRange(min, max, bucketCount)
} else {
List(min, min)
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index a822bd18bf..f89bdb6e07 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramBuckets === expectedHistogramBuckets)
}
+ test("WorksWithoutBucketsForLargerDatasets") {
+ // Verify the case of slighly larger datasets
+ val rdd = sc.parallelize(6 to 99)
+ val (histogramBuckets, histogramResults) = rdd.histogram(8)
+ val expectedHistogramResults =
+ Array(12, 12, 11, 12, 12, 11, 12, 12)
+ val expectedHistogramBuckets =
+ Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+
+ test("WorksWithoutBucketsWithIrrationalBucketEdges") {
+ // Verify the case of buckets with irrational edges. See #SPARK-2862.
+ val rdd = sc.parallelize(6 to 99)
+ val (histogramBuckets, histogramResults) = rdd.histogram(9)
+ val expectedHistogramResults =
+ Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets(0) === 6.0)
+ assert(histogramBuckets(9) === 99.0)
+ }
+
// Test the failure mode with an invalid RDD
test("ThrowsExceptionOnInvalidRDDs") {
// infinity