aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-02-20 10:21:39 +0000
committerSean Owen <sowen@cloudera.com>2015-02-20 10:21:39 +0000
commitd3dfebebce9f76e4433e16d4d6d29fb8fa4d4193 (patch)
tree194484be85c9bde9fe2f83b0b0574aadf0b1c484
parent70bfb5c7282df84e76eba01f59bf1b8551583c33 (diff)
downloadspark-d3dfebebce9f76e4433e16d4d6d29fb8fa4d4193.tar.gz
spark-d3dfebebce9f76e4433e16d4d6d29fb8fa4d4193.tar.bz2
spark-d3dfebebce9f76e4433e16d4d6d29fb8fa4d4193.zip
SPARK-5744 [CORE] Take 2. RDD.isEmpty / take fails for (empty) RDD of Nothing
Follow-on to https://github.com/apache/spark/pull/4591 Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD. CC rxin since you reviewed the last one although I imagine this is an uncontroversial resolution. Author: Sean Owen <sowen@cloudera.com> Closes #4698 from srowen/SPARK-5744.2 and squashes the following commits: 9b2a811 [Sean Owen] 2 extra javadoc fixes d1b9fba [Sean Owen] Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala3
5 files changed, 24 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d59b466830..85ec5ea113 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -548,6 +548,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
+ * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
+ * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index e66f83bb34..03afc28973 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -213,7 +213,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
} else {
basicBucketFunction _
}
- self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+ if (self.partitions.length == 0) {
+ new Array[Long](buckets.length - 1)
+ } else {
+ // reduce() requires a non-empty RDD. This works because the mapPartitions will make
+ // non-empty partitions out of empty ones. But it doesn't handle the no-partitions case,
+ // which is below
+ self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3ab9e54f0e..cf0433010a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1146,6 +1146,9 @@ abstract class RDD[T: ClassTag](
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
+ *
+ * @note due to complications in the internal implementation, this method will raise
+ * an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = {
if (num == 0) {
@@ -1258,6 +1261,10 @@ abstract class RDD[T: ClassTag](
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
/**
+ * @note due to complications in the internal implementation, this method will raise an
+ * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
+ * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
+ * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b16a1e9460..74e88c767e 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -708,6 +708,10 @@ public class JavaAPISuite implements Serializable {
// Test with provided buckets
long[] histogram = rdd.histogram(expected_buckets);
Assert.assertArrayEquals(expected_counts, histogram);
+ // SPARK-5744
+ Assert.assertArrayEquals(
+ new long[] {0},
+ sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
}
@Test
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index de30653375..4cd0f97368 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -33,6 +33,9 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
val expectedHistogramResults = Array(0)
assert(histogramResults === expectedHistogramResults)
assert(histogramResults2 === expectedHistogramResults)
+ val emptyRDD: RDD[Double] = sc.emptyRDD
+ assert(emptyRDD.histogram(buckets) === expectedHistogramResults)
+ assert(emptyRDD.histogram(buckets, true) === expectedHistogramResults)
}
test("WorksWithOutOfRangeWithOneBucket") {