aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-05-13 21:45:36 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-05-13 21:45:36 -0700
commit016ac868303adbffb19165430610869363fa943f (patch)
tree6fc8eba2035c12e1c298435e3613edde5d4e2aac
parentb9aef263dfca64ddf24f4ac0b7ad43460d4706e4 (diff)
parent64d4d2b036447f42bfcd3bac5687c79a3b0661ca (diff)
downloadspark-016ac868303adbffb19165430610869363fa943f.tar.gz
spark-016ac868303adbffb19165430610869363fa943f.tar.bz2
spark-016ac868303adbffb19165430610869363fa943f.zip
Merge pull request #601 from rxin/emptyrdd-master
EmptyRDD (master branch 0.8)
-rw-r--r--core/src/main/scala/spark/rdd/EmptyRDD.scala16
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala22
2 files changed, 37 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
new file mode 100644
index 0000000000..e4dd3a7fa7
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+
+
+/**
+ * An RDD that is empty, i.e. has no element in it.
+ */
+class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+
+ override def getPartitions: Array[Partition] = Array.empty
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ throw new UnsupportedOperationException("empty RDD")
+ }
+}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cee6312572..a761dd77c5 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -5,7 +5,7 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -147,6 +147,26 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("empty RDD") {
+ sc = new SparkContext("local", "test")
+ val empty = new EmptyRDD[Int](sc)
+ assert(empty.count === 0)
+ assert(empty.collect().size === 0)
+
+ val thrown = intercept[UnsupportedOperationException]{
+ empty.reduce(_+_)
+ }
+ assert(thrown.getMessage.contains("empty"))
+
+ val emptyKv = new EmptyRDD[(Int, Int)](sc)
+ val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x))
+ assert(rdd.join(emptyKv).collect().size === 0)
+ assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
+ assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
+ assert(rdd.cogroup(emptyKv).collect().size === 2)
+ assert(rdd.union(emptyKv).collect().size === 2)
+ }
+
test("cogrouped RDDs") {
sc = new SparkContext("local", "test")
val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)