diff options
Diffstat (limited to 'core/src/main/scala/spark/SimpleJob.scala')
-rw-r--r-- | core/src/main/scala/spark/SimpleJob.scala | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index d982a75ba0..bf881fb2d4 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -60,6 +60,15 @@ extends Job(jobId) with Logging var failed = false var causeOfFailure = "" + // How frequently to reprint duplicate exceptions in full, in milliseconds + val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong + // Map of recent exceptions (identified by string representation and + // top stack frame) to duplicate count (how many times the same + // exception has appeared) and time the full exception was + // printed. This should ideally be an LRU map that can drop old + // exceptions automatically. + val recentExceptions = HashMap[String, (Int, Long)]() + // Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. for (i <- (0 until numTasks).reverse) { @@ -229,6 +238,34 @@ extends Job(jobId) with Logging if (tasksFinished == numTasks) sched.jobFinished(this) return + case ef: ExceptionFailure => + val key = ef.exception.toString + val now = System.currentTimeMillis + val (printFull, dupCount) = + if (recentExceptions.contains(key)) { + val (dupCount, printTime) = recentExceptions(key) + if (now - printTime > EXCEPTION_PRINT_INTERVAL) { + recentExceptions(key) = (0, now) + (true, 0) + } else { + recentExceptions(key) = (dupCount + 1, printTime) + (false, dupCount + 1) + } + } else { + recentExceptions += Tuple(key, (0, now)) + (true, 0) + } + + if (printFull) { + val stackTrace = + for (elem <- ef.exception.getStackTrace) + yield "\tat %s".format(elem.toString) + logInfo("Loss was due to %s\n%s".format( + ef.exception.toString, stackTrace.mkString("\n"))) + } else { + logInfo("Loss was due to %s [duplicate %d]".format( + ef.exception.toString, dupCount)) + } case _ => {} } } |