From 21636ee4faf30126b36ad568753788327e634857 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 1 Jan 2013 07:52:31 -0800 Subject: Test with exception while computing cached RDD. --- core/src/test/scala/spark/RDDSuite.scala | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'core') diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 08da9a1c4d..45e6c5f840 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -88,6 +88,29 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("caching with failures") { + sc = new SparkContext("local", "test") + val onlySplit = new Split { override def index: Int = 0 } + var shouldFail = true + val rdd = new RDD[Int](sc) { + override def splits: Array[Split] = Array(onlySplit) + override val dependencies = List[Dependency[_]]() + override def compute(split: Split, context: TaskContext): Iterator[Int] = { + if (shouldFail) { + throw new Exception("injected failure") + } else { + return Array(1, 2, 3, 4).iterator + } + } + }.cache() + val thrown = intercept[Exception]{ + rdd.collect() + } + assert(thrown.getMessage.contains("injected failure")) + shouldFail = false + assert(rdd.collect().toList === List(1, 2, 3, 4)) + } + test("coalesced RDDs") { sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) -- cgit v1.2.3