From fa9df4a45daf5fd8b19df20c1fb7466bde3b2054 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 12 Dec 2012 23:39:10 -0800 Subject: Normalize executor exit statuses and report them to the user. --- core/src/main/scala/spark/executor/Executor.scala | 9 +++-- .../scala/spark/executor/ExecutorExitCode.scala | 40 ++++++++++++++++++++++ .../spark/scheduler/cluster/ClusterScheduler.scala | 3 +- .../scheduler/cluster/ExecutorLostReason.scala | 21 ++++++++++++ .../cluster/SparkDeploySchedulerBackend.scala | 10 +++++- .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../scheduler/mesos/MesosSchedulerBackend.scala | 16 ++++++--- core/src/main/scala/spark/storage/DiskStore.scala | 4 ++- 8 files changed, 94 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/spark/executor/ExecutorExitCode.scala create mode 100644 core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala (limited to 'core') diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index cb29a6b8b4..2552958d27 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -50,9 +50,14 @@ private[spark] class Executor extends Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) - System.exit(1) + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } } catch { - case t: Throwable => System.exit(2) + case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM) + case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } } diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala new file mode 100644 index 0000000000..7fdc3b1d34 --- /dev/null +++ b/core/src/main/scala/spark/executor/ExecutorExitCode.scala @@ -0,0 +1,40 @@ +package spark.executor + +/** + * These are exit codes that executors should use to provide the master with information about + * executor failures assuming that cluster management framework can capture the exit codes (but + * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict + * with "natural" exit statuses that may be caused by the JVM or user code. In particular, + * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the + * OpenJDK JVM may use exit code 1 in some of its own "last chance" code. + */ +private[spark] +object ExecutorExitCode { + /** The default uncaught exception handler was reached. */ + val UNCAUGHT_EXCEPTION = 50 + /** The default uncaught exception handler was called and an exception was encountered while + logging the exception. */ + val UNCAUGHT_EXCEPTION_TWICE = 51 + /** The default uncaught exception handler was reached, and the uncaught exception was an + OutOfMemoryError. */ + val OOM = 52 + /** DiskStore failed to create a local temporary directory after many attempts. */ + val DISK_STORE_FAILED_TO_CREATE_DIR = 53 + + def explainExitCode(exitCode: Int): String = { + exitCode match { + case UNCAUGHT_EXCEPTION => "Uncaught exception" + case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed" + case OOM => "OutOfMemoryError" + case DISK_STORE_FAILED_TO_CREATE_DIR => + "Failed to create local directory (bad spark.local.dir?)" + case _ => + "Unknown executor exit code (" + exitCode + ")" + ( + if (exitCode > 128) + " (died from signal " + (exitCode - 128) + "?)" + else + "" + ) + } + } +} diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index f5e852d203..d160379b14 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def slaveLost(slaveId: String) { + def slaveLost(slaveId: String, reason: ExecutorLostReason) { var failedHost: Option[String] = None synchronized { val host = slaveIdToHost(slaveId) @@ -261,6 +261,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } if (failedHost != None) { + logError("Lost an executor on " + failedHost.get + ": " + reason) listener.hostLost(failedHost.get) backend.reviveOffers() } diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala new file mode 100644 index 0000000000..8976b3969d --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala @@ -0,0 +1,21 @@ +package spark.scheduler.cluster + +import spark.executor.ExecutorExitCode + +/** + * Represents an explanation for a executor or whole slave failing or exiting. + */ +private[spark] +class ExecutorLostReason(val message: String) { + override def toString: String = message +} + +private[spark] +case class ExecutorExited(val exitCode: Int) + extends ExecutorLostReason(ExecutorExitCode.explainExitCode(exitCode)) { +} + +private[spark] +case class SlaveLost(_message: String = "Slave lost") + extends ExecutorLostReason(_message) { +} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 8f8ae9f409..f505628753 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -72,11 +72,19 @@ private[spark] class SparkDeploySchedulerBackend( } def executorRemoved(id: String, message: String) { + var reason: ExecutorLostReason = SlaveLost(message) + if (message.startsWith("Command exited with code ")) { + try { + reason = ExecutorExited(message.substring("Command exited with code ".length).toInt) + } catch { + case nfe: NumberFormatException => {} + } + } logInfo("Executor %s removed: %s".format(id, message)) executorIdToSlaveId.get(id) match { case Some(slaveId) => executorIdToSlaveId.remove(id) - scheduler.slaveLost(slaveId) + scheduler.slaveLost(slaveId, reason) case None => logInfo("No slave ID known for executor %s".format(id)) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d2cce0dc05..77f526cf4d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor freeCores -= slaveId slaveHost -= slaveId totalCoreCount.addAndGet(-numCores) - scheduler.slaveLost(slaveId) + scheduler.slaveLost(slaveId, SlaveLost()) } } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 814443fa52..b0d4315f05 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -267,17 +267,23 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLostReason) { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { slaveIdsWithExecutors -= slaveId.getValue } - scheduler.slaveLost(slaveId.getValue) + scheduler.slaveLost(slaveId.getValue, reason) + } + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + recordSlaveLost(d, slaveId, SlaveLost()) } - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) + override def executorLost(d: SchedulerDriver, executorId: ExecutorID, + slaveId: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, + slaveId.getValue)) + recordSlaveLost(d, slaveId, ExecutorExited(status)) } // TODO: query Mesos for number of cores diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 8ba64e4b76..b5561479db 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import scala.collection.mutable.ArrayBuffer +import spark.executor.ExecutorExitCode + import spark.Utils /** @@ -162,7 +164,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) if (!foundLocalDir) { logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create local dir in " + rootDir) - System.exit(1) + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } logInfo("Created local directory at " + localDir) localDir -- cgit v1.2.3