aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/test/scala/spark/scheduler/TaskContextSuite.scala43
-rw-r--r--run2.cmd2
3 files changed, 49 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index e492279b4e..2aad7956b4 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U](
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
- val result = func(context, rdd.iterator(split, context))
- context.executeOnCompleteCallbacks()
- result
+ try {
+ func(context, rdd.iterator(split, context))
+ } finally {
+ context.executeOnCompleteCallbacks()
+ }
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
new file mode 100644
index 0000000000..f937877340
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -0,0 +1,43 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import spark.TaskContext
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
+class TaskContextSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
+ test("Calls executeOnCompleteCallbacks after failure") {
+ var completed = false
+ sc = new SparkContext("local", "test")
+ val rdd = new RDD[String](sc) {
+ override val splits = Array[Split](StubSplit(0))
+ override val dependencies = List()
+ override def compute(split: Split, context: TaskContext) = {
+ context.addOnCompleteCallback(() => completed = true)
+ sys.error("failed")
+ }
+ }
+ val func = (c: TaskContext, i: Iterator[String]) => i.next
+ val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
+ intercept[RuntimeException] {
+ task.run(0)
+ }
+ assert(completed === true)
+ }
+
+ case class StubSplit(val index: Int) extends Split
+} \ No newline at end of file
diff --git a/run2.cmd b/run2.cmd
index 83464b1166..67f1e465e4 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -1,6 +1,6 @@
@echo off
-set SCALA_VERSION=2.9.1
+set SCALA_VERSION=2.9.2
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0