From 0fd84965f66aa37d2ae14da799b86a5c8ed1cb32 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 6 May 2013 15:35:18 -0700 Subject: Added EmptyRDD. --- core/src/main/scala/spark/rdd/EmptyRDD.scala | 16 ++++++++++++++++ core/src/test/scala/spark/RDDSuite.scala | 14 +++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/spark/rdd/EmptyRDD.scala diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala new file mode 100644 index 0000000000..e4dd3a7fa7 --- /dev/null +++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} + + +/** + * An RDD that is empty, i.e. has no element in it. + */ +class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) { + + override def getPartitions: Array[Partition] = Array.empty + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + throw new UnsupportedOperationException("empty RDD") + } +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index cee6312572..2ce757b13c 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts._ import org.scalatest.time.{Span, Millis} import spark.SparkContext._ -import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} +import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD} class RDDSuite extends FunSuite with LocalSparkContext { @@ -147,6 +147,18 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("empty RDD") { + sc = new SparkContext("local", "test") + val empty = new EmptyRDD[Int](sc) + assert(empty.count === 0) + assert(empty.collect().size === 0) + + val thrown = intercept[UnsupportedOperationException]{ + empty.reduce(_+_) + } + assert(thrown.getMessage.contains("empty")) + } + test("cogrouped RDDs") { sc = new SparkContext("local", "test") val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2) -- cgit v1.2.3 From 64d4d2b036447f42bfcd3bac5687c79a3b0661ca Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 6 May 2013 16:30:46 -0700 Subject: Added tests for joins, cogroups, and unions for EmptyRDD. --- core/src/test/scala/spark/RDDSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 2ce757b13c..a761dd77c5 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -157,6 +157,14 @@ class RDDSuite extends FunSuite with LocalSparkContext { empty.reduce(_+_) } assert(thrown.getMessage.contains("empty")) + + val emptyKv = new EmptyRDD[(Int, Int)](sc) + val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x)) + assert(rdd.join(emptyKv).collect().size === 0) + assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) + assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.cogroup(emptyKv).collect().size === 2) + assert(rdd.union(emptyKv).collect().size === 2) } test("cogrouped RDDs") { -- cgit v1.2.3