diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-05-13 21:45:36 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-05-13 21:45:36 -0700 |
commit | 016ac868303adbffb19165430610869363fa943f (patch) | |
tree | 6fc8eba2035c12e1c298435e3613edde5d4e2aac /core | |
parent | b9aef263dfca64ddf24f4ac0b7ad43460d4706e4 (diff) | |
parent | 64d4d2b036447f42bfcd3bac5687c79a3b0661ca (diff) | |
download | spark-016ac868303adbffb19165430610869363fa943f.tar.gz spark-016ac868303adbffb19165430610869363fa943f.tar.bz2 spark-016ac868303adbffb19165430610869363fa943f.zip |
Merge pull request #601 from rxin/emptyrdd-master
EmptyRDD (master branch 0.8)
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/EmptyRDD.scala | 16 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 22 |
2 files changed, 37 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala new file mode 100644 index 0000000000..e4dd3a7fa7 --- /dev/null +++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} + + +/** + * An RDD that is empty, i.e. has no element in it. + */ +class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) { + + override def getPartitions: Array[Partition] = Array.empty + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + throw new UnsupportedOperationException("empty RDD") + } +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index cee6312572..a761dd77c5 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts._ import org.scalatest.time.{Span, Millis} import spark.SparkContext._ -import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} +import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD} class RDDSuite extends FunSuite with LocalSparkContext { @@ -147,6 +147,26 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("empty RDD") { + sc = new SparkContext("local", "test") + val empty = new EmptyRDD[Int](sc) + assert(empty.count === 0) + assert(empty.collect().size === 0) + + val thrown = intercept[UnsupportedOperationException]{ + empty.reduce(_+_) + } + assert(thrown.getMessage.contains("empty")) + + val emptyKv = new EmptyRDD[(Int, Int)](sc) + val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x)) + assert(rdd.join(emptyKv).collect().size === 0) + assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) + assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.cogroup(emptyKv).collect().size === 2) + assert(rdd.union(emptyKv).collect().size === 2) + } + test("cogrouped RDDs") { sc = new SparkContext("local", "test") val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2) |