diff options
author | Holden Karau <holden@pigscanfly.ca> | 2013-10-21 00:10:03 -0700 |
---|---|---|
committer | Holden Karau <holden@pigscanfly.ca> | 2013-10-21 00:10:03 -0700 |
commit | 699f7d28c0347cb516fa17f94b53d7bc50f18346 (patch) | |
tree | e35d59f5434eea37f57c35ea38da919f8ad7483d /core | |
parent | e58c69d955ef8faacb794a0c1666b21c1606453e (diff) | |
download | spark-699f7d28c0347cb516fa17f94b53d7bc50f18346.tar.gz spark-699f7d28c0347cb516fa17f94b53d7bc50f18346.tar.bz2 spark-699f7d28c0347cb516fa17f94b53d7bc50f18346.zip |
CR feedback
Diffstat (limited to 'core')
3 files changed, 125 insertions, 101 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index d2a2818e59..b002468442 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -167,12 +167,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav * value is 0 and the max is 100 and there are two buckets the resulting * buckets will be [0,50) [50,100]. bucketCount must be at least 1 * If the RDD contains infinity, NaN throws an exception - * If the elements in RDD do not vary (max == min) throws an exception + * If the elements in RDD do not vary (max == min) always returns a single bucket. */ def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { val result = srdd.histogram(bucketCount) (result._1.map(scala.Double.box(_)), result._2) } + /** * Compute a histogram using the provided buckets. The buckets are all open * to the left except for the last which is closed @@ -181,14 +182,21 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav * e.g 1<=x<10 , 10<=x<20, 20<=x<50 * And on the input of 1 and 50 we would have a histogram of 1,0,0 * - * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches - * from an O(log n) inseration to O(1) per element. (where n = # buckets) + * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched + * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets + * to true. * buckets must be sorted and not contain any duplicates. * buckets array must be at least two elements - * All NaN entries are treated the same. + * All NaN entries are treated the same. If you have a NaN bucket it must be + * the maximum value of the last position and all NaN entries will be counted + * in that bucket. */ def histogram(buckets: Array[Double]): Array[Long] = { - srdd.histogram(buckets.map(_.toDouble)) + srdd.histogram(buckets.map(_.toDouble), false) + } + + def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = { + srdd.histogram(buckets.map(_.toDouble), evenBuckets) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 776a83cefe..33738ee094 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -83,44 +83,50 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * Compute a histogram of the data using bucketCount number of buckets evenly * spaced between the minimum and maximum of the RDD. For example if the min * value is 0 and the max is 100 and there are two buckets the resulting - * buckets will be [0,50) [50,100]. bucketCount must be at least 1 + * buckets will be [0, 50) [50, 100]. bucketCount must be at least 1 * If the RDD contains infinity, NaN throws an exception - * If the elements in RDD do not vary (max == min) throws an exception + * If the elements in RDD do not vary (max == min) always returns a single bucket. */ def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { // Compute the minimum and the maxium val (max: Double, min: Double) = self.mapPartitions { items => Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) => - (x._1.max(e),x._2.min(e)))) + (x._1.max(e), x._2.min(e)))) }.reduce { (maxmin1, maxmin2) => (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2)) } if (max.isNaN() || max.isInfinity || min.isInfinity ) { - throw new UnsupportedOperationException("Histogram on either an empty RDD or RDD containing +-infinity or NaN") - } - if (max == min) { - throw new UnsupportedOperationException("Histogram with no range in elements") + throw new UnsupportedOperationException( + "Histogram on either an empty RDD or RDD containing +/-infinity or NaN") } val increment: Double = (max-min)/bucketCount.toDouble - val range = Range.Double.inclusive(min, max, increment) + val range = if (increment != 0) { + Range.Double.inclusive(min, max, increment) + } else { + List(min, min) + } val buckets: Array[Double] = range.toArray - (buckets,histogram(buckets)) + (buckets, histogram(buckets, true)) } + /** * Compute a histogram using the provided buckets. The buckets are all open * to the left except for the last which is closed * e.g. for the array - * [1,10,20,50] the buckets are [1,10) [10,20) [20,50] + * [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50] * e.g 1<=x<10 , 10<=x<20, 20<=x<50 - * And on the input of 1 and 50 we would have a histogram of 1,0,0 + * And on the input of 1 and 50 we would have a histogram of 1, 0, 0 * - * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches - * from an O(log n) inseration to O(1) per element. (where n = # buckets) + * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched + * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets + * to true. * buckets must be sorted and not contain any duplicates. * buckets array must be at least two elements - * All NaN entries are treated the same. + * All NaN entries are treated the same. If you have a NaN bucket it must be + * the maximum value of the last position and all NaN entries will be counted + * in that bucket. */ - def histogram(buckets: Array[Double]): Array[Long] = { + def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] = { if (buckets.length < 2) { throw new IllegalArgumentException("buckets array must have at least two elements") } @@ -129,11 +135,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { // to increment or returns None if there is no bucket. This is done so we can // specialize for uniformly distributed buckets and save the O(log n) binary // search cost. - def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]): Iterator[Array[Long]] = { - val counters = new Array[Long](buckets.length-1) + def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]): + Iterator[Array[Long]] = { + val counters = new Array[Long](buckets.length - 1) while (iter.hasNext) { bucketFunction(iter.next()) match { - case Some(x: Int) => {counters(x)+=1} + case Some(x: Int) => {counters(x) += 1} case _ => {} } } @@ -161,12 +168,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } else { None } - } else if (location < buckets.length-1) { + } else if (location < buckets.length - 1) { // Exact match, just insert here Some(location) } else { // Exact match to the last element - Some(location-1) + Some(location - 1) } } // Determine the bucket function in constant time. Requires that buckets are evenly spaced @@ -175,34 +182,19 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { if (e.isNaN()) { return None } - val bucketNumber = (e-min)/(increment) + val bucketNumber = (e - min)/(increment) // We do this rather than buckets.lengthCompare(bucketNumber) // because Array[Double] fails to override it (for now). if (bucketNumber > count || bucketNumber < 0) { None } else { - Some(bucketNumber.toInt.min(count-1)) - } - } - def evenlySpaced(buckets: Array[Double]): Boolean = { - val delta = buckets(1)-buckets(0) - // Technically you could have an evenly spaced bucket with NaN - // increments but then its a single bucket and this makes the - // fastBucketFunction simpler. - if (delta.isNaN() || delta.isInfinite()) { - return false - } - for (i <- 1 to buckets.length-1) { - if (buckets(i)-buckets(i-1) != delta) { - return false - } + Some(bucketNumber.toInt.min(count - 1)) } - true } // Decide which bucket function to pass to histogramPartition. We decide here // rather than having a general function so that the decission need only be made // once rather than once per shard - val bucketFunction = if (evenlySpaced(buckets)) { + val bucketFunction = if (evenBuckets) { fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _ } else { basicBucketFunction _ diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala index 2ec7173511..071084485a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -34,134 +34,151 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { val rdd: RDD[Double] = sc.parallelize(Seq()) val buckets: Array[Double] = Array(0.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) val expectedHistogramResults: Array[Long] = Array(0) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksWithOutOfRangeWithOneBucket") { // Verify that if all of the elements are out of range the counts are zero - val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01)) + val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01)) val buckets: Array[Double] = Array(0.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) val expectedHistogramResults: Array[Long] = Array(0) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksInRangeWithOneBucket") { // Verify the basic case of one bucket and all elements in that bucket works - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4)) val buckets: Array[Double] = Array(0.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) val expectedHistogramResults: Array[Long] = Array(4) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksInRangeWithOneBucketExactMatch") { // Verify the basic case of one bucket and all elements in that bucket works - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4)) val buckets: Array[Double] = Array(1.0, 4.0) val histogramResults: Array[Long] = rdd.histogram(buckets) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) val expectedHistogramResults: Array[Long] = Array(4) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksWithOutOfRangeWithTwoBuckets") { // Verify that out of range works with two buckets - val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01)) + val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01)) val buckets: Array[Double] = Array(0.0, 5.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(0,0) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) + val expectedHistogramResults: Array[Long] = Array(0, 0) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksWithOutOfRangeWithTwoUnEvenBuckets") { // Verify that out of range works with two un even buckets - val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01)) + val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01)) val buckets: Array[Double] = Array(0.0, 4.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(0,0) + val expectedHistogramResults: Array[Long] = Array(0, 0) assert(histogramResults === expectedHistogramResults) } test("WorksInRangeWithTwoBuckets") { // Make sure that it works with two equally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6)) val buckets: Array[Double] = Array(0.0, 5.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(3,2) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) + val expectedHistogramResults: Array[Long] = Array(3, 2) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksInRangeWithTwoBucketsAndNaN") { // Make sure that it works with two equally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6,Double.NaN)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN)) val buckets: Array[Double] = Array(0.0, 5.0, 10.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(3,2) + val histogramResults2: Array[Long] = rdd.histogram(buckets, true) + val expectedHistogramResults: Array[Long] = Array(3, 2) assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) } test("WorksInRangeWithTwoUnevenBuckets") { // Make sure that it works with two unequally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6)) val buckets: Array[Double] = Array(0.0, 5.0, 11.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(3,2) + val expectedHistogramResults: Array[Long] = Array(3, 2) assert(histogramResults === expectedHistogramResults) } test("WorksMixedRangeWithTwoUnevenBuckets") { // Make sure that it works with two unequally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.0,11.01)) + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01)) val buckets: Array[Double] = Array(0.0, 5.0, 11.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(4,3) + val expectedHistogramResults: Array[Long] = Array(4, 3) assert(histogramResults === expectedHistogramResults) } test("WorksMixedRangeWithFourUnevenBuckets") { // Make sure that it works with two unequally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1)) - val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0) + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(4,2,1,3) + val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3) assert(histogramResults === expectedHistogramResults) } test("WorksMixedRangeWithUnevenBucketsAndNaN") { // Make sure that it works with two unequally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN)) - val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0) + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1, Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(4,2,1,3) + val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3) assert(histogramResults === expectedHistogramResults) } // Make sure this works with a NaN end bucket test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") { // Make sure that it works with two unequally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN)) - val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN) + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1, Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(4,2,1,2,3) + val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 3) assert(histogramResults === expectedHistogramResults) } // Make sure this works with a NaN end bucket and an inifity test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") { // Make sure that it works with two unequally spaced buckets and elements in each - val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,1.0/0.0,-1.0/0.0,Double.NaN)) - val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN) + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1, 1.0/0.0, -1.0/0.0, Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(4,2,1,2,4) + val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 4) assert(histogramResults === expectedHistogramResults) } test("WorksWithOutOfRangeWithInfiniteBuckets") { // Verify that out of range works with two buckets - val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01,Double.NaN)) - val buckets: Array[Double] = Array(-1.0/0.0 ,0.0, 1.0/0.0) + val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01, Double.NaN)) + val buckets: Array[Double] = Array(-1.0/0.0 , 0.0, 1.0/0.0) val histogramResults: Array[Long] = rdd.histogram(buckets) - val expectedHistogramResults: Array[Long] = Array(1,1) + val expectedHistogramResults: Array[Long] = Array(1, 1) assert(histogramResults === expectedHistogramResults) } // Test the failure mode with an invalid bucket array test("ThrowsExceptionOnInvalidBucketArray") { val rdd: RDD[Double] = sc.parallelize(Seq(1.0)) // Empty array - intercept[IllegalArgumentException]{ + intercept[IllegalArgumentException] { val buckets: Array[Double] = Array.empty[Double] val result = rdd.histogram(buckets) } // Single element array - intercept[IllegalArgumentException] - { + intercept[IllegalArgumentException] { val buckets: Array[Double] = Array(1.0) val result = rdd.histogram(buckets) } @@ -170,25 +187,45 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { // Test automatic histogram function test("WorksWithoutBucketsBasic") { // Verify the basic case of one bucket and all elements in that bucket works - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4)) val (histogramBuckets, histogramResults) = rdd.histogram(1) val expectedHistogramResults: Array[Long] = Array(4) - val expectedHistogramBuckets: Array[Double] = Array(1.0,4.0) + val expectedHistogramBuckets: Array[Double] = Array(1.0, 4.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + // Test automatic histogram function with a single element + test("WorksWithoutBucketsBasicSingleElement") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1)) + val (histogramBuckets, histogramResults) = rdd.histogram(1) + val expectedHistogramResults: Array[Long] = Array(1) + val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + // Test automatic histogram function with a single element + test("WorksWithoutBucketsBasicNoRange") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 1)) + val (histogramBuckets, histogramResults) = rdd.histogram(1) + val expectedHistogramResults: Array[Long] = Array(4) + val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0) assert(histogramResults === expectedHistogramResults) assert(histogramBuckets === expectedHistogramBuckets) } test("WorksWithoutBucketsBasicTwo") { // Verify the basic case of one bucket and all elements in that bucket works - val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4)) val (histogramBuckets, histogramResults) = rdd.histogram(2) - val expectedHistogramResults: Array[Long] = Array(2,2) - val expectedHistogramBuckets: Array[Double] = Array(1.0,2.5,4.0) + val expectedHistogramResults: Array[Long] = Array(2, 2) + val expectedHistogramBuckets: Array[Double] = Array(1.0, 2.5, 4.0) assert(histogramResults === expectedHistogramResults) assert(histogramBuckets === expectedHistogramBuckets) } test("WorksWithoutBucketsWithMoreRequestedThanElements") { // Verify the basic case of one bucket and all elements in that bucket works - val rdd: RDD[Double] = sc.parallelize(Seq(1,2)) + val rdd: RDD[Double] = sc.parallelize(Seq(1, 2)) val (histogramBuckets, histogramResults) = rdd.histogram(10) val expectedHistogramResults: Array[Long] = Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1) @@ -197,37 +234,24 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { assert(histogramResults === expectedHistogramResults) assert(histogramBuckets === expectedHistogramBuckets) } + // Test the failure mode with an invalid RDD test("ThrowsExceptionOnInvalidRDDs") { // infinity - intercept[UnsupportedOperationException]{ - val rdd: RDD[Double] = sc.parallelize(Seq(1,1.0/0.0)) + intercept[UnsupportedOperationException] { + val rdd: RDD[Double] = sc.parallelize(Seq(1, 1.0/0.0)) val result = rdd.histogram(1) } // NaN - intercept[UnsupportedOperationException] - { - val rdd: RDD[Double] = sc.parallelize(Seq(1,Double.NaN)) + intercept[UnsupportedOperationException] { + val rdd: RDD[Double] = sc.parallelize(Seq(1, Double.NaN)) val result = rdd.histogram(1) } // Empty - intercept[UnsupportedOperationException] - { + intercept[UnsupportedOperationException] { val rdd: RDD[Double] = sc.parallelize(Seq()) val result = rdd.histogram(1) } - // Single element - intercept[UnsupportedOperationException] - { - val rdd: RDD[Double] = sc.parallelize(Seq(1)) - val result = rdd.histogram(1) - } - // No Range - intercept[UnsupportedOperationException] - { - val rdd: RDD[Double] = sc.parallelize(Seq(1,1,1)) - val result = rdd.histogram(1) - } } } |