aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-01 08:21:33 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-01 08:21:33 -0800
commit55809fbc6db5929332cd45fa3281f9190098c6c6 (patch)
tree823c3d2f7f0956a842e2767730a4a1eb87883ee8 /core
parentc593f6329ee6f4f319810432c17b6d5703a3e0eb (diff)
parent58072a7340e20251ed810457bc67a79f106bae42 (diff)
downloadspark-55809fbc6db5929332cd45fa3281f9190098c6c6.tar.gz
spark-55809fbc6db5929332cd45fa3281f9190098c6c6.tar.bz2
spark-55809fbc6db5929332cd45fa3281f9190098c6c6.zip
Merge pull request #349 from woggling/cache-finally
Avoid stalls when computation of cached RDD throws exception
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala16
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala23
2 files changed, 28 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 3d79078733..04c26b2e40 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -202,26 +202,20 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
loading.add(key)
}
}
- // If we got here, we have to load the split
- // Tell the master that we're doing so
- //val host = System.getProperty("spark.hostname", Utils.localHostName)
- //val future = trackerActor !! AddedToCache(rdd.id, split.index, host)
- // TODO: fetch any remote copy of the split that may be available
- // TODO: also register a listener for when it unloads
- logInfo("Computing partition " + split)
- val elements = new ArrayBuffer[Any]
- elements ++= rdd.compute(split, context)
try {
+ // If we got here, we have to load the split
+ val elements = new ArrayBuffer[Any]
+ logInfo("Computing partition " + split)
+ elements ++= rdd.compute(split, context)
// Try to put this block in the blockManager
blockManager.put(key, elements, storageLevel, true)
- //future.apply() // Wait for the reply from the cache tracker
+ return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
- return elements.iterator.asInstanceOf[Iterator[T]]
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 08da9a1c4d..45e6c5f840 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -88,6 +88,29 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("caching with failures") {
+ sc = new SparkContext("local", "test")
+ val onlySplit = new Split { override def index: Int = 0 }
+ var shouldFail = true
+ val rdd = new RDD[Int](sc) {
+ override def splits: Array[Split] = Array(onlySplit)
+ override val dependencies = List[Dependency[_]]()
+ override def compute(split: Split, context: TaskContext): Iterator[Int] = {
+ if (shouldFail) {
+ throw new Exception("injected failure")
+ } else {
+ return Array(1, 2, 3, 4).iterator
+ }
+ }
+ }.cache()
+ val thrown = intercept[Exception]{
+ rdd.collect()
+ }
+ assert(thrown.getMessage.contains("injected failure"))
+ shouldFail = false
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ }
+
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)