From 35b6358a7c4e9558789577e07c1953c9008d3e9c Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 26 Oct 2011 21:07:17 +0000 Subject: Report errors in tasks to the driver via a Mesos status update When a task throws an exception, the Spark executor previously just logged it to a local file on the slave and exited. This commit causes Spark to also report the exception back to the driver using a Mesos status update, so the user doesn't have to look through a log file on the slave. Here's what the reporting currently looks like: # ./run spark.examples.ExceptionHandlingTest master@203.0.113.1:5050 [...] 11/10/26 21:04:13 INFO spark.SimpleJob: Lost TID 1 (task 0:1) 11/10/26 21:04:13 INFO spark.SimpleJob: Loss was due to java.lang.Exception: Testing exception handling [...] 11/10/26 21:04:16 INFO spark.SparkContext: Job finished in 5.988547328 s --- core/src/main/scala/spark/Executor.scala | 7 +++++++ core/src/main/scala/spark/SimpleJob.scala | 2 ++ .../scala/spark/examples/ExceptionHandlingTest.scala | 18 ++++++++++++++++++ 3 files changed, 27 insertions(+) create mode 100644 examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index e1256a229e..31ba122baf 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging { .build()) } case t: Throwable => { + val reason = OtherFailure(t.toString()) + d.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(desc.getTaskId) + .setState(TaskState.TASK_FAILED) + .setData(ByteString.copyFrom(Utils.serialize(reason))) + .build()) + // TODO: Handle errors in tasks less dramatically logError("Exception in task ID " + tid, t) System.exit(1) diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index d982a75ba0..6a27f159c4 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -229,6 +229,8 @@ extends Job(jobId) with Logging if (tasksFinished == numTasks) sched.jobFinished(this) return + case otherFailure: OtherFailure => + logInfo("Loss was due to %s".format(otherFailure.message)) case _ => {} } } diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala new file mode 100644 index 0000000000..46f658eab2 --- /dev/null +++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala @@ -0,0 +1,18 @@ +package spark.examples + +import spark.SparkContext + +object ExceptionHandlingTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: ExceptionHandlingTest ") + System.exit(1) + } + + val sc = new SparkContext(args(0), "ExceptionHandlingTest") + sc.parallelize(0 until sc.defaultParallelism).foreach { i => + if (Math.random > 0.75) + throw new Exception("Testing exception handling") + } + } +} -- cgit v1.2.3 From ad4ebff42c1b738746b2b9ecfbb041b6d06e3e16 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 28 Oct 2011 06:10:04 +0000 Subject: Deduplicate exceptions when printing them The first time they appear, exceptions are printed in full, including a stack trace. After that, they are printed in abbreviated form. They are periodically reprinted in full; the reprint interval defaults to 5 seconds and is configurable using the property spark.logging.exceptionPrintInterval. --- core/src/main/scala/spark/DAGScheduler.scala | 1 + core/src/main/scala/spark/Executor.scala | 2 +- core/src/main/scala/spark/SimpleJob.scala | 39 ++++++++++++++++++++++++++-- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index 93cab9fb62..c9411f4208 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -19,6 +19,7 @@ case class CompletionEvent(task: DAGTask[_], reason: TaskEndReason, result: Any, sealed trait TaskEndReason case object Success extends TaskEndReason case class FetchFailed(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason +case class ExceptionFailure(exception: Throwable) extends TaskEndReason case class OtherFailure(message: String) extends TaskEndReason /** diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 31ba122baf..15693fc95f 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -87,7 +87,7 @@ class Executor extends org.apache.mesos.Executor with Logging { .build()) } case t: Throwable => { - val reason = OtherFailure(t.toString()) + val reason = ExceptionFailure(t) d.sendStatusUpdate(TaskStatus.newBuilder() .setTaskId(desc.getTaskId) .setState(TaskState.TASK_FAILED) diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 6a27f159c4..bf881fb2d4 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -60,6 +60,15 @@ extends Job(jobId) with Logging var failed = false var causeOfFailure = "" + // How frequently to reprint duplicate exceptions in full, in milliseconds + val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong + // Map of recent exceptions (identified by string representation and + // top stack frame) to duplicate count (how many times the same + // exception has appeared) and time the full exception was + // printed. This should ideally be an LRU map that can drop old + // exceptions automatically. + val recentExceptions = HashMap[String, (Int, Long)]() + // Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. for (i <- (0 until numTasks).reverse) { @@ -229,8 +238,34 @@ extends Job(jobId) with Logging if (tasksFinished == numTasks) sched.jobFinished(this) return - case otherFailure: OtherFailure => - logInfo("Loss was due to %s".format(otherFailure.message)) + case ef: ExceptionFailure => + val key = ef.exception.toString + val now = System.currentTimeMillis + val (printFull, dupCount) = + if (recentExceptions.contains(key)) { + val (dupCount, printTime) = recentExceptions(key) + if (now - printTime > EXCEPTION_PRINT_INTERVAL) { + recentExceptions(key) = (0, now) + (true, 0) + } else { + recentExceptions(key) = (dupCount + 1, printTime) + (false, dupCount + 1) + } + } else { + recentExceptions += Tuple(key, (0, now)) + (true, 0) + } + + if (printFull) { + val stackTrace = + for (elem <- ef.exception.getStackTrace) + yield "\tat %s".format(elem.toString) + logInfo("Loss was due to %s\n%s".format( + ef.exception.toString, stackTrace.mkString("\n"))) + } else { + logInfo("Loss was due to %s [duplicate %d]".format( + ef.exception.toString, dupCount)) + } case _ => {} } } -- cgit v1.2.3