aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-06 15:35:18 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-06 15:40:34 -0700
commit0fd84965f66aa37d2ae14da799b86a5c8ed1cb32 (patch)
tree8e521f3b57537cb8bd3512a53135d7b04eff4622
parente93805f2b115dab07145af3126fa88d2989f2c48 (diff)
downloadspark-0fd84965f66aa37d2ae14da799b86a5c8ed1cb32.tar.gz
spark-0fd84965f66aa37d2ae14da799b86a5c8ed1cb32.tar.bz2
spark-0fd84965f66aa37d2ae14da799b86a5c8ed1cb32.zip
Added EmptyRDD.
-rw-r--r--core/src/main/scala/spark/rdd/EmptyRDD.scala16
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala14
2 files changed, 29 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
new file mode 100644
index 0000000000..e4dd3a7fa7
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+
+
+/**
+ * An RDD that is empty, i.e. has no element in it.
+ */
+class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+
+ override def getPartitions: Array[Partition] = Array.empty
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ throw new UnsupportedOperationException("empty RDD")
+ }
+}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cee6312572..2ce757b13c 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -5,7 +5,7 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -147,6 +147,18 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("empty RDD") {
+ sc = new SparkContext("local", "test")
+ val empty = new EmptyRDD[Int](sc)
+ assert(empty.count === 0)
+ assert(empty.collect().size === 0)
+
+ val thrown = intercept[UnsupportedOperationException]{
+ empty.reduce(_+_)
+ }
+ assert(thrown.getMessage.contains("empty"))
+ }
+
test("cogrouped RDDs") {
sc = new SparkContext("local", "test")
val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)