aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-01-19 22:50:44 -0800
committerPatrick Wendell <patrick@databricks.com>2015-01-19 22:50:45 -0800
commit306ff187af0c49f61f4bc1850021561397b4f8f1 (patch)
tree3b6fc4dbaaf50e8437c4e9a83ce85826380237d2 /core
parente69fb8c75aab7b95abf03785c3b2f1384373003a (diff)
downloadspark-306ff187af0c49f61f4bc1850021561397b4f8f1.tar.gz
spark-306ff187af0c49f61f4bc1850021561397b4f8f1.tar.bz2
spark-306ff187af0c49f61f4bc1850021561397b4f8f1.zip
SPARK-5270 [CORE] Provide isEmpty() function in RDD API
Pretty minor, but submitted for consideration -- this would at least help people make this check in the most efficient way I know. Author: Sean Owen <sowen@cloudera.com> Closes #4074 from srowen/SPARK-5270 and squashes the following commits: 66885b8 [Sean Owen] Add note that JavaRDDLike should not be implemented by user code 2e9b490 [Sean Owen] More tests, and Mima-exclude the new isEmpty method in JavaRDDLike 28395ff [Sean Owen] Add isEmpty to Java, Python 7dd04b7 [Sean Owen] Add efficient RDD.isEmpty()
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala6
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java21
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala9
4 files changed, 46 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index bd451634e5..62bf18d82d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
+/**
+ * Defines operations common to several Java RDD implementations.
+ * Note that this trait is not intended to be implemented by user code.
+ */
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
@@ -436,6 +440,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def first(): T = rdd.first()
/**
+ * @return true if and only if the RDD contains no elements at all. Note that an RDD
+ * may be empty even when it has at least 1 partition.
+ */
+ def isEmpty(): Boolean = rdd.isEmpty()
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5118e2b911..97012c7033 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1176,6 +1176,12 @@ abstract class RDD[T: ClassTag](
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
/**
+ * @return true if and only if the RDD contains no elements at all. Note that an RDD
+ * may be empty even when it has at least 1 partition.
+ */
+ def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 07b1e44d04..004de05c10 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -607,6 +607,27 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void isEmpty() {
+ Assert.assertTrue(sc.emptyRDD().isEmpty());
+ Assert.assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
+ Assert.assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
+ Assert.assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
+ new Function<Integer,Boolean>() {
+ @Override
+ public Boolean call(Integer i) {
+ return i < 0;
+ }
+ }).isEmpty());
+ Assert.assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
+ new Function<Integer, Boolean>() {
+ @Override
+ public Boolean call(Integer i) {
+ return i > 1;
+ }
+ }).isEmpty());
+ }
+
+ @Test
public void cartesian() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 0deb9b18b8..381ee2d456 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -52,6 +52,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
+ assert(!nums.isEmpty())
assert(nums.max() === 4)
assert(nums.min() === 1)
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
@@ -545,6 +546,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedTopK === nums.sorted(ord).take(5))
}
+ test("isEmpty") {
+ assert(sc.emptyRDD.isEmpty())
+ assert(sc.parallelize(Seq[Int]()).isEmpty())
+ assert(!sc.parallelize(Seq(1)).isEmpty())
+ assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
+ assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
+ }
+
test("sample preserves partitioner") {
val partitioner = new HashPartitioner(2)
val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner)