aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-12-14 18:19:43 +0100
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-12-14 18:19:43 +0100
commit3034fc0d91955062f01a2d198200a3b84d2761bc (patch)
treeb9cd4b0b232715816f3eda9355a531d421a44edb
parent6a650cbbdf7d64d9a79d8dcfe7850e1d678413e0 (diff)
parentad4ebff42c1b738746b2b9ecfbb041b6d06e3e16 (diff)
downloadspark-3034fc0d91955062f01a2d198200a3b84d2761bc.tar.gz
spark-3034fc0d91955062f01a2d198200a3b84d2761bc.tar.bz2
spark-3034fc0d91955062f01a2d198200a3b84d2761bc.zip
Merge commit 'ad4ebff42c1b738746b2b9ecfbb041b6d06e3e16'
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala1
-rw-r--r--core/src/main/scala/spark/Executor.scala7
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala37
-rw-r--r--examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala18
4 files changed, 63 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
index 93cab9fb62..c9411f4208 100644
--- a/core/src/main/scala/spark/DAGScheduler.scala
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -19,6 +19,7 @@ case class CompletionEvent(task: DAGTask[_], reason: TaskEndReason, result: Any,
sealed trait TaskEndReason
case object Success extends TaskEndReason
case class FetchFailed(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
+case class ExceptionFailure(exception: Throwable) extends TaskEndReason
case class OtherFailure(message: String) extends TaskEndReason
/**
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index e1256a229e..15693fc95f 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging {
.build())
}
case t: Throwable => {
+ val reason = ExceptionFailure(t)
+ d.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(desc.getTaskId)
+ .setState(TaskState.TASK_FAILED)
+ .setData(ByteString.copyFrom(Utils.serialize(reason)))
+ .build())
+
// TODO: Handle errors in tasks less dramatically
logError("Exception in task ID " + tid, t)
System.exit(1)
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index d982a75ba0..bf881fb2d4 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -60,6 +60,15 @@ extends Job(jobId) with Logging
var failed = false
var causeOfFailure = ""
+ // How frequently to reprint duplicate exceptions in full, in milliseconds
+ val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+ // Map of recent exceptions (identified by string representation and
+ // top stack frame) to duplicate count (how many times the same
+ // exception has appeared) and time the full exception was
+ // printed. This should ideally be an LRU map that can drop old
+ // exceptions automatically.
+ val recentExceptions = HashMap[String, (Int, Long)]()
+
// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
@@ -229,6 +238,34 @@ extends Job(jobId) with Logging
if (tasksFinished == numTasks)
sched.jobFinished(this)
return
+ case ef: ExceptionFailure =>
+ val key = ef.exception.toString
+ val now = System.currentTimeMillis
+ val (printFull, dupCount) =
+ if (recentExceptions.contains(key)) {
+ val (dupCount, printTime) = recentExceptions(key)
+ if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
+ recentExceptions(key) = (0, now)
+ (true, 0)
+ } else {
+ recentExceptions(key) = (dupCount + 1, printTime)
+ (false, dupCount + 1)
+ }
+ } else {
+ recentExceptions += Tuple(key, (0, now))
+ (true, 0)
+ }
+
+ if (printFull) {
+ val stackTrace =
+ for (elem <- ef.exception.getStackTrace)
+ yield "\tat %s".format(elem.toString)
+ logInfo("Loss was due to %s\n%s".format(
+ ef.exception.toString, stackTrace.mkString("\n")))
+ } else {
+ logInfo("Loss was due to %s [duplicate %d]".format(
+ ef.exception.toString, dupCount))
+ }
case _ => {}
}
}
diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
new file mode 100644
index 0000000000..46f658eab2
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
@@ -0,0 +1,18 @@
+package spark.examples
+
+import spark.SparkContext
+
+object ExceptionHandlingTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: ExceptionHandlingTest <host>")
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest")
+ sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
+ if (Math.random > 0.75)
+ throw new Exception("Testing exception handling")
+ }
+ }
+}