diff options
author | Ankur Dave <ankurdave@gmail.com> | 2011-10-26 21:07:17 +0000 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2011-11-14 01:54:53 +0000 |
commit | 35b6358a7c4e9558789577e07c1953c9008d3e9c (patch) | |
tree | 093262d22a10a948754193831ba69ca3e57404f2 | |
parent | 07532021fee9e2d27ee954b21c30830687478d8b (diff) | |
download | spark-35b6358a7c4e9558789577e07c1953c9008d3e9c.tar.gz spark-35b6358a7c4e9558789577e07c1953c9008d3e9c.tar.bz2 spark-35b6358a7c4e9558789577e07c1953c9008d3e9c.zip |
Report errors in tasks to the driver via a Mesos status update
When a task throws an exception, the Spark executor previously just
logged it to a local file on the slave and exited. This commit causes
Spark to also report the exception back to the driver using a Mesos
status update, so the user doesn't have to look through a log file on
the slave.
Here's what the reporting currently looks like:
# ./run spark.examples.ExceptionHandlingTest master@203.0.113.1:5050
[...]
11/10/26 21:04:13 INFO spark.SimpleJob: Lost TID 1 (task 0:1)
11/10/26 21:04:13 INFO spark.SimpleJob: Loss was due to java.lang.Exception: Testing exception handling
[...]
11/10/26 21:04:16 INFO spark.SparkContext: Job finished in 5.988547328 s
-rw-r--r-- | core/src/main/scala/spark/Executor.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/SimpleJob.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala | 18 |
3 files changed, 27 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index e1256a229e..31ba122baf 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging { .build()) } case t: Throwable => { + val reason = OtherFailure(t.toString()) + d.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(desc.getTaskId) + .setState(TaskState.TASK_FAILED) + .setData(ByteString.copyFrom(Utils.serialize(reason))) + .build()) + // TODO: Handle errors in tasks less dramatically logError("Exception in task ID " + tid, t) System.exit(1) diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index d982a75ba0..6a27f159c4 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -229,6 +229,8 @@ extends Job(jobId) with Logging if (tasksFinished == numTasks) sched.jobFinished(this) return + case otherFailure: OtherFailure => + logInfo("Loss was due to %s".format(otherFailure.message)) case _ => {} } } diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala new file mode 100644 index 0000000000..46f658eab2 --- /dev/null +++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala @@ -0,0 +1,18 @@ +package spark.examples + +import spark.SparkContext + +object ExceptionHandlingTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: ExceptionHandlingTest <host>") + System.exit(1) + } + + val sc = new SparkContext(args(0), "ExceptionHandlingTest") + sc.parallelize(0 until sc.defaultParallelism).foreach { i => + if (Math.random > 0.75) + throw new Exception("Testing exception handling") + } + } +} |