aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2013-01-01 07:52:31 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2013-01-01 08:07:40 -0800
commit21636ee4faf30126b36ad568753788327e634857 (patch)
tree82b9d1ac9c580d477718dad08e54c568145569f0 /core
parentfeadaf72f44e7c66521c03171592671d4c441bda (diff)
downloadspark-21636ee4faf30126b36ad568753788327e634857.tar.gz
spark-21636ee4faf30126b36ad568753788327e634857.tar.bz2
spark-21636ee4faf30126b36ad568753788327e634857.zip
Test with exception while computing cached RDD.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala23
1 files changed, 23 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 08da9a1c4d..45e6c5f840 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -88,6 +88,29 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("caching with failures") {
+ sc = new SparkContext("local", "test")
+ val onlySplit = new Split { override def index: Int = 0 }
+ var shouldFail = true
+ val rdd = new RDD[Int](sc) {
+ override def splits: Array[Split] = Array(onlySplit)
+ override val dependencies = List[Dependency[_]]()
+ override def compute(split: Split, context: TaskContext): Iterator[Int] = {
+ if (shouldFail) {
+ throw new Exception("injected failure")
+ } else {
+ return Array(1, 2, 3, 4).iterator
+ }
+ }
+ }.cache()
+ val thrown = intercept[Exception]{
+ rdd.collect()
+ }
+ assert(thrown.getMessage.contains("injected failure"))
+ shouldFail = false
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ }
+
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)