aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/Executor.scala7
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala2
-rw-r--r--examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala18
3 files changed, 27 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index e1256a229e..31ba122baf 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging {
.build())
}
case t: Throwable => {
+ val reason = OtherFailure(t.toString())
+ d.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(desc.getTaskId)
+ .setState(TaskState.TASK_FAILED)
+ .setData(ByteString.copyFrom(Utils.serialize(reason)))
+ .build())
+
// TODO: Handle errors in tasks less dramatically
logError("Exception in task ID " + tid, t)
System.exit(1)
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index d982a75ba0..6a27f159c4 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -229,6 +229,8 @@ extends Job(jobId) with Logging
if (tasksFinished == numTasks)
sched.jobFinished(this)
return
+ case otherFailure: OtherFailure =>
+ logInfo("Loss was due to %s".format(otherFailure.message))
case _ => {}
}
}
diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
new file mode 100644
index 0000000000..46f658eab2
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
@@ -0,0 +1,18 @@
+package spark.examples
+
+import spark.SparkContext
+
+object ExceptionHandlingTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: ExceptionHandlingTest <host>")
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest")
+ sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
+ if (Math.random > 0.75)
+ throw new Exception("Testing exception handling")
+ }
+ }
+}