aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2013-10-21 00:10:03 -0700
committerHolden Karau <holden@pigscanfly.ca>2013-10-21 00:10:03 -0700
commit699f7d28c0347cb516fa17f94b53d7bc50f18346 (patch)
treee35d59f5434eea37f57c35ea38da919f8ad7483d /core
parente58c69d955ef8faacb794a0c1666b21c1606453e (diff)
downloadspark-699f7d28c0347cb516fa17f94b53d7bc50f18346.tar.gz
spark-699f7d28c0347cb516fa17f94b53d7bc50f18346.tar.bz2
spark-699f7d28c0347cb516fa17f94b53d7bc50f18346.zip
CR feedback
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala68
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala140
3 files changed, 125 insertions, 101 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index d2a2818e59..b002468442 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -167,12 +167,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
* value is 0 and the max is 100 and there are two buckets the resulting
* buckets will be [0,50) [50,100]. bucketCount must be at least 1
* If the RDD contains infinity, NaN throws an exception
- * If the elements in RDD do not vary (max == min) throws an exception
+ * If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
val result = srdd.histogram(bucketCount)
(result._1.map(scala.Double.box(_)), result._2)
}
+
/**
* Compute a histogram using the provided buckets. The buckets are all open
* to the left except for the last which is closed
@@ -181,14 +182,21 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
* e.g 1<=x<10 , 10<=x<20, 20<=x<50
* And on the input of 1 and 50 we would have a histogram of 1,0,0
*
- * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches
- * from an O(log n) inseration to O(1) per element. (where n = # buckets)
+ * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
+ * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
+ * to true.
* buckets must be sorted and not contain any duplicates.
* buckets array must be at least two elements
- * All NaN entries are treated the same.
+ * All NaN entries are treated the same. If you have a NaN bucket it must be
+ * the maximum value of the last position and all NaN entries will be counted
+ * in that bucket.
*/
def histogram(buckets: Array[Double]): Array[Long] = {
- srdd.histogram(buckets.map(_.toDouble))
+ srdd.histogram(buckets.map(_.toDouble), false)
+ }
+
+ def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = {
+ srdd.histogram(buckets.map(_.toDouble), evenBuckets)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 776a83cefe..33738ee094 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -83,44 +83,50 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* Compute a histogram of the data using bucketCount number of buckets evenly
* spaced between the minimum and maximum of the RDD. For example if the min
* value is 0 and the max is 100 and there are two buckets the resulting
- * buckets will be [0,50) [50,100]. bucketCount must be at least 1
+ * buckets will be [0, 50) [50, 100]. bucketCount must be at least 1
* If the RDD contains infinity, NaN throws an exception
- * If the elements in RDD do not vary (max == min) throws an exception
+ * If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) =>
- (x._1.max(e),x._2.min(e))))
+ (x._1.max(e), x._2.min(e))))
}.reduce { (maxmin1, maxmin2) =>
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
if (max.isNaN() || max.isInfinity || min.isInfinity ) {
- throw new UnsupportedOperationException("Histogram on either an empty RDD or RDD containing +-infinity or NaN")
- }
- if (max == min) {
- throw new UnsupportedOperationException("Histogram with no range in elements")
+ throw new UnsupportedOperationException(
+ "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
val increment: Double = (max-min)/bucketCount.toDouble
- val range = Range.Double.inclusive(min, max, increment)
+ val range = if (increment != 0) {
+ Range.Double.inclusive(min, max, increment)
+ } else {
+ List(min, min)
+ }
val buckets: Array[Double] = range.toArray
- (buckets,histogram(buckets))
+ (buckets, histogram(buckets, true))
}
+
/**
* Compute a histogram using the provided buckets. The buckets are all open
* to the left except for the last which is closed
* e.g. for the array
- * [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
+ * [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50]
* e.g 1<=x<10 , 10<=x<20, 20<=x<50
- * And on the input of 1 and 50 we would have a histogram of 1,0,0
+ * And on the input of 1 and 50 we would have a histogram of 1, 0, 0
*
- * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches
- * from an O(log n) inseration to O(1) per element. (where n = # buckets)
+ * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
+ * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
+ * to true.
* buckets must be sorted and not contain any duplicates.
* buckets array must be at least two elements
- * All NaN entries are treated the same.
+ * All NaN entries are treated the same. If you have a NaN bucket it must be
+ * the maximum value of the last position and all NaN entries will be counted
+ * in that bucket.
*/
- def histogram(buckets: Array[Double]): Array[Long] = {
+ def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] = {
if (buckets.length < 2) {
throw new IllegalArgumentException("buckets array must have at least two elements")
}
@@ -129,11 +135,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
// to increment or returns None if there is no bucket. This is done so we can
// specialize for uniformly distributed buckets and save the O(log n) binary
// search cost.
- def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]): Iterator[Array[Long]] = {
- val counters = new Array[Long](buckets.length-1)
+ def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]):
+ Iterator[Array[Long]] = {
+ val counters = new Array[Long](buckets.length - 1)
while (iter.hasNext) {
bucketFunction(iter.next()) match {
- case Some(x: Int) => {counters(x)+=1}
+ case Some(x: Int) => {counters(x) += 1}
case _ => {}
}
}
@@ -161,12 +168,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
} else {
None
}
- } else if (location < buckets.length-1) {
+ } else if (location < buckets.length - 1) {
// Exact match, just insert here
Some(location)
} else {
// Exact match to the last element
- Some(location-1)
+ Some(location - 1)
}
}
// Determine the bucket function in constant time. Requires that buckets are evenly spaced
@@ -175,34 +182,19 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
if (e.isNaN()) {
return None
}
- val bucketNumber = (e-min)/(increment)
+ val bucketNumber = (e - min)/(increment)
// We do this rather than buckets.lengthCompare(bucketNumber)
// because Array[Double] fails to override it (for now).
if (bucketNumber > count || bucketNumber < 0) {
None
} else {
- Some(bucketNumber.toInt.min(count-1))
- }
- }
- def evenlySpaced(buckets: Array[Double]): Boolean = {
- val delta = buckets(1)-buckets(0)
- // Technically you could have an evenly spaced bucket with NaN
- // increments but then its a single bucket and this makes the
- // fastBucketFunction simpler.
- if (delta.isNaN() || delta.isInfinite()) {
- return false
- }
- for (i <- 1 to buckets.length-1) {
- if (buckets(i)-buckets(i-1) != delta) {
- return false
- }
+ Some(bucketNumber.toInt.min(count - 1))
}
- true
}
// Decide which bucket function to pass to histogramPartition. We decide here
// rather than having a general function so that the decission need only be made
// once rather than once per shard
- val bucketFunction = if (evenlySpaced(buckets)) {
+ val bucketFunction = if (evenBuckets) {
fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
} else {
basicBucketFunction _
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index 2ec7173511..071084485a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -34,134 +34,151 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
val rdd: RDD[Double] = sc.parallelize(Seq())
val buckets: Array[Double] = Array(0.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
val expectedHistogramResults: Array[Long] = Array(0)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksWithOutOfRangeWithOneBucket") {
// Verify that if all of the elements are out of range the counts are zero
- val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+ val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
val buckets: Array[Double] = Array(0.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
val expectedHistogramResults: Array[Long] = Array(0)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksInRangeWithOneBucket") {
// Verify the basic case of one bucket and all elements in that bucket works
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
val buckets: Array[Double] = Array(0.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
val expectedHistogramResults: Array[Long] = Array(4)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksInRangeWithOneBucketExactMatch") {
// Verify the basic case of one bucket and all elements in that bucket works
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
val buckets: Array[Double] = Array(1.0, 4.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
val expectedHistogramResults: Array[Long] = Array(4)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksWithOutOfRangeWithTwoBuckets") {
// Verify that out of range works with two buckets
- val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+ val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(0,0)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
+ val expectedHistogramResults: Array[Long] = Array(0, 0)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksWithOutOfRangeWithTwoUnEvenBuckets") {
// Verify that out of range works with two un even buckets
- val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+ val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
val buckets: Array[Double] = Array(0.0, 4.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(0,0)
+ val expectedHistogramResults: Array[Long] = Array(0, 0)
assert(histogramResults === expectedHistogramResults)
}
test("WorksInRangeWithTwoBuckets") {
// Make sure that it works with two equally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6))
val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(3,2)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
+ val expectedHistogramResults: Array[Long] = Array(3, 2)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksInRangeWithTwoBucketsAndNaN") {
// Make sure that it works with two equally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6,Double.NaN))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN))
val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(3,2)
+ val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
+ val expectedHistogramResults: Array[Long] = Array(3, 2)
assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
}
test("WorksInRangeWithTwoUnevenBuckets") {
// Make sure that it works with two unequally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6))
val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(3,2)
+ val expectedHistogramResults: Array[Long] = Array(3, 2)
assert(histogramResults === expectedHistogramResults)
}
test("WorksMixedRangeWithTwoUnevenBuckets") {
// Make sure that it works with two unequally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.0,11.01))
+ val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01))
val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(4,3)
+ val expectedHistogramResults: Array[Long] = Array(4, 3)
assert(histogramResults === expectedHistogramResults)
}
test("WorksMixedRangeWithFourUnevenBuckets") {
// Make sure that it works with two unequally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1))
- val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0)
+ val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1))
+ val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(4,2,1,3)
+ val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3)
assert(histogramResults === expectedHistogramResults)
}
test("WorksMixedRangeWithUnevenBucketsAndNaN") {
// Make sure that it works with two unequally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN))
- val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0)
+ val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1, Double.NaN))
+ val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(4,2,1,3)
+ val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3)
assert(histogramResults === expectedHistogramResults)
}
// Make sure this works with a NaN end bucket
test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") {
// Make sure that it works with two unequally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN))
- val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN)
+ val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1, Double.NaN))
+ val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(4,2,1,2,3)
+ val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 3)
assert(histogramResults === expectedHistogramResults)
}
// Make sure this works with a NaN end bucket and an inifity
test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") {
// Make sure that it works with two unequally spaced buckets and elements in each
- val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,1.0/0.0,-1.0/0.0,Double.NaN))
- val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN)
+ val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1, 1.0/0.0, -1.0/0.0, Double.NaN))
+ val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(4,2,1,2,4)
+ val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 4)
assert(histogramResults === expectedHistogramResults)
}
test("WorksWithOutOfRangeWithInfiniteBuckets") {
// Verify that out of range works with two buckets
- val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01,Double.NaN))
- val buckets: Array[Double] = Array(-1.0/0.0 ,0.0, 1.0/0.0)
+ val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
+ val buckets: Array[Double] = Array(-1.0/0.0 , 0.0, 1.0/0.0)
val histogramResults: Array[Long] = rdd.histogram(buckets)
- val expectedHistogramResults: Array[Long] = Array(1,1)
+ val expectedHistogramResults: Array[Long] = Array(1, 1)
assert(histogramResults === expectedHistogramResults)
}
// Test the failure mode with an invalid bucket array
test("ThrowsExceptionOnInvalidBucketArray") {
val rdd: RDD[Double] = sc.parallelize(Seq(1.0))
// Empty array
- intercept[IllegalArgumentException]{
+ intercept[IllegalArgumentException] {
val buckets: Array[Double] = Array.empty[Double]
val result = rdd.histogram(buckets)
}
// Single element array
- intercept[IllegalArgumentException]
- {
+ intercept[IllegalArgumentException] {
val buckets: Array[Double] = Array(1.0)
val result = rdd.histogram(buckets)
}
@@ -170,25 +187,45 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
// Test automatic histogram function
test("WorksWithoutBucketsBasic") {
// Verify the basic case of one bucket and all elements in that bucket works
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
val (histogramBuckets, histogramResults) = rdd.histogram(1)
val expectedHistogramResults: Array[Long] = Array(4)
- val expectedHistogramBuckets: Array[Double] = Array(1.0,4.0)
+ val expectedHistogramBuckets: Array[Double] = Array(1.0, 4.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+ // Test automatic histogram function with a single element
+ test("WorksWithoutBucketsBasicSingleElement") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd: RDD[Double] = sc.parallelize(Seq(1))
+ val (histogramBuckets, histogramResults) = rdd.histogram(1)
+ val expectedHistogramResults: Array[Long] = Array(1)
+ val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+ // Test automatic histogram function with a single element
+ test("WorksWithoutBucketsBasicNoRange") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 1))
+ val (histogramBuckets, histogramResults) = rdd.histogram(1)
+ val expectedHistogramResults: Array[Long] = Array(4)
+ val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets === expectedHistogramBuckets)
}
test("WorksWithoutBucketsBasicTwo") {
// Verify the basic case of one bucket and all elements in that bucket works
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
val (histogramBuckets, histogramResults) = rdd.histogram(2)
- val expectedHistogramResults: Array[Long] = Array(2,2)
- val expectedHistogramBuckets: Array[Double] = Array(1.0,2.5,4.0)
+ val expectedHistogramResults: Array[Long] = Array(2, 2)
+ val expectedHistogramBuckets: Array[Double] = Array(1.0, 2.5, 4.0)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets === expectedHistogramBuckets)
}
test("WorksWithoutBucketsWithMoreRequestedThanElements") {
// Verify the basic case of one bucket and all elements in that bucket works
- val rdd: RDD[Double] = sc.parallelize(Seq(1,2))
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 2))
val (histogramBuckets, histogramResults) = rdd.histogram(10)
val expectedHistogramResults: Array[Long] =
Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1)
@@ -197,37 +234,24 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets === expectedHistogramBuckets)
}
+
// Test the failure mode with an invalid RDD
test("ThrowsExceptionOnInvalidRDDs") {
// infinity
- intercept[UnsupportedOperationException]{
- val rdd: RDD[Double] = sc.parallelize(Seq(1,1.0/0.0))
+ intercept[UnsupportedOperationException] {
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, 1.0/0.0))
val result = rdd.histogram(1)
}
// NaN
- intercept[UnsupportedOperationException]
- {
- val rdd: RDD[Double] = sc.parallelize(Seq(1,Double.NaN))
+ intercept[UnsupportedOperationException] {
+ val rdd: RDD[Double] = sc.parallelize(Seq(1, Double.NaN))
val result = rdd.histogram(1)
}
// Empty
- intercept[UnsupportedOperationException]
- {
+ intercept[UnsupportedOperationException] {
val rdd: RDD[Double] = sc.parallelize(Seq())
val result = rdd.histogram(1)
}
- // Single element
- intercept[UnsupportedOperationException]
- {
- val rdd: RDD[Double] = sc.parallelize(Seq(1))
- val result = rdd.histogram(1)
- }
- // No Range
- intercept[UnsupportedOperationException]
- {
- val rdd: RDD[Double] = sc.parallelize(Seq(1,1,1))
- val result = rdd.histogram(1)
- }
}
}