From 1d8e2e6cffdd63b736f26054d4657c399293913e Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 10 Dec 2012 21:40:09 -0800 Subject: Call slaveLost on executor death for standalone clusters. --- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7aba7324ab..8f8ae9f409 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + val executorIdToSlaveId = new HashMap[String, String] // Memory used by each executor (in megabytes) val executorMemory = { @@ -65,9 +66,19 @@ private[spark] class SparkDeploySchedulerBackend( } def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { + executorIdToSlaveId += id -> workerId logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String) {} + def executorRemoved(id: String, message: String) { + logInfo("Executor %s removed: %s".format(id, message)) + executorIdToSlaveId.get(id) match { + case Some(slaveId) => + executorIdToSlaveId.remove(id) + scheduler.slaveLost(slaveId) + case None => + logInfo("No slave ID known for executor %s".format(id)) + } + } } -- cgit v1.2.3 From fa9df4a45daf5fd8b19df20c1fb7466bde3b2054 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 12 Dec 2012 23:39:10 -0800 Subject: Normalize executor exit statuses and report them to the user. --- core/src/main/scala/spark/executor/Executor.scala | 9 +++-- .../scala/spark/executor/ExecutorExitCode.scala | 40 ++++++++++++++++++++++ .../spark/scheduler/cluster/ClusterScheduler.scala | 3 +- .../scheduler/cluster/ExecutorLostReason.scala | 21 ++++++++++++ .../cluster/SparkDeploySchedulerBackend.scala | 10 +++++- .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../scheduler/mesos/MesosSchedulerBackend.scala | 16 ++++++--- core/src/main/scala/spark/storage/DiskStore.scala | 4 ++- 8 files changed, 94 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/spark/executor/ExecutorExitCode.scala create mode 100644 core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index cb29a6b8b4..2552958d27 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -50,9 +50,14 @@ private[spark] class Executor extends Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) - System.exit(1) + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } } catch { - case t: Throwable => System.exit(2) + case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM) + case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } } diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala new file mode 100644 index 0000000000..7fdc3b1d34 --- /dev/null +++ b/core/src/main/scala/spark/executor/ExecutorExitCode.scala @@ -0,0 +1,40 @@ +package spark.executor + +/** + * These are exit codes that executors should use to provide the master with information about + * executor failures assuming that cluster management framework can capture the exit codes (but + * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict + * with "natural" exit statuses that may be caused by the JVM or user code. In particular, + * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the + * OpenJDK JVM may use exit code 1 in some of its own "last chance" code. + */ +private[spark] +object ExecutorExitCode { + /** The default uncaught exception handler was reached. */ + val UNCAUGHT_EXCEPTION = 50 + /** The default uncaught exception handler was called and an exception was encountered while + logging the exception. */ + val UNCAUGHT_EXCEPTION_TWICE = 51 + /** The default uncaught exception handler was reached, and the uncaught exception was an + OutOfMemoryError. */ + val OOM = 52 + /** DiskStore failed to create a local temporary directory after many attempts. */ + val DISK_STORE_FAILED_TO_CREATE_DIR = 53 + + def explainExitCode(exitCode: Int): String = { + exitCode match { + case UNCAUGHT_EXCEPTION => "Uncaught exception" + case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed" + case OOM => "OutOfMemoryError" + case DISK_STORE_FAILED_TO_CREATE_DIR => + "Failed to create local directory (bad spark.local.dir?)" + case _ => + "Unknown executor exit code (" + exitCode + ")" + ( + if (exitCode > 128) + " (died from signal " + (exitCode - 128) + "?)" + else + "" + ) + } + } +} diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index f5e852d203..d160379b14 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def slaveLost(slaveId: String) { + def slaveLost(slaveId: String, reason: ExecutorLostReason) { var failedHost: Option[String] = None synchronized { val host = slaveIdToHost(slaveId) @@ -261,6 +261,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } if (failedHost != None) { + logError("Lost an executor on " + failedHost.get + ": " + reason) listener.hostLost(failedHost.get) backend.reviveOffers() } diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala new file mode 100644 index 0000000000..8976b3969d --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala @@ -0,0 +1,21 @@ +package spark.scheduler.cluster + +import spark.executor.ExecutorExitCode + +/** + * Represents an explanation for a executor or whole slave failing or exiting. + */ +private[spark] +class ExecutorLostReason(val message: String) { + override def toString: String = message +} + +private[spark] +case class ExecutorExited(val exitCode: Int) + extends ExecutorLostReason(ExecutorExitCode.explainExitCode(exitCode)) { +} + +private[spark] +case class SlaveLost(_message: String = "Slave lost") + extends ExecutorLostReason(_message) { +} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 8f8ae9f409..f505628753 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -72,11 +72,19 @@ private[spark] class SparkDeploySchedulerBackend( } def executorRemoved(id: String, message: String) { + var reason: ExecutorLostReason = SlaveLost(message) + if (message.startsWith("Command exited with code ")) { + try { + reason = ExecutorExited(message.substring("Command exited with code ".length).toInt) + } catch { + case nfe: NumberFormatException => {} + } + } logInfo("Executor %s removed: %s".format(id, message)) executorIdToSlaveId.get(id) match { case Some(slaveId) => executorIdToSlaveId.remove(id) - scheduler.slaveLost(slaveId) + scheduler.slaveLost(slaveId, reason) case None => logInfo("No slave ID known for executor %s".format(id)) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d2cce0dc05..77f526cf4d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor freeCores -= slaveId slaveHost -= slaveId totalCoreCount.addAndGet(-numCores) - scheduler.slaveLost(slaveId) + scheduler.slaveLost(slaveId, SlaveLost()) } } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 814443fa52..b0d4315f05 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -267,17 +267,23 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLostReason) { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { slaveIdsWithExecutors -= slaveId.getValue } - scheduler.slaveLost(slaveId.getValue) + scheduler.slaveLost(slaveId.getValue, reason) + } + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + recordSlaveLost(d, slaveId, SlaveLost()) } - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) + override def executorLost(d: SchedulerDriver, executorId: ExecutorID, + slaveId: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, + slaveId.getValue)) + recordSlaveLost(d, slaveId, ExecutorExited(status)) } // TODO: query Mesos for number of cores diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 8ba64e4b76..b5561479db 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import scala.collection.mutable.ArrayBuffer +import spark.executor.ExecutorExitCode + import spark.Utils /** @@ -162,7 +164,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) if (!foundLocalDir) { logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create local dir in " + rootDir) - System.exit(1) + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } logInfo("Created local directory at " + localDir) localDir -- cgit v1.2.3 From a4041dd87f7b33b28de29ef0a4eebe33c7b0e6ca Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 13 Dec 2012 16:11:08 -0800 Subject: Log duplicate slaveLost() calls in ClusterScheduler. --- .../src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index d160379b14..ab200decb1 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -254,14 +254,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) synchronized { val host = slaveIdToHost(slaveId) if (hostsAlive.contains(host)) { + logError("Lost an executor on " + host + ": " + reason) slaveIdsWithExecutors -= slaveId hostsAlive -= host activeTaskSetsQueue.foreach(_.hostLost(host)) failedHost = Some(host) + } else { + // We may get multiple slaveLost() calls with different loss reasons. For example, one + // may be triggered by a dropped connection from the slave while another may be a report + // of executor termination from Mesos. We produce log messages for both so we eventually + // report the termination reason. + logError("Lost an executor on " + host + " (already removed): " + reason) } } if (failedHost != None) { - logError("Lost an executor on " + failedHost.get + ": " + reason) listener.hostLost(failedHost.get) backend.reviveOffers() } -- cgit v1.2.3 From 829206f1a73ad860fea17705c074ea43599ee66b Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 13 Dec 2012 16:11:28 -0800 Subject: Explain slaveLost calls made by StandaloneSchedulerBackend --- .../spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 77f526cf4d..eeaae23dc8 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -69,13 +69,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor context.stop(self) case Terminated(actor) => - actorToSlaveId.get(actor).foreach(removeSlave) + actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated")) case RemoteClientDisconnected(transport, address) => - addressToSlaveId.get(address).foreach(removeSlave) + addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected")) case RemoteClientShutdown(transport, address) => - addressToSlaveId.get(address).foreach(removeSlave) + addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown")) } // Make fake resource offers on all slaves @@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } // Remove a disconnected slave from the cluster - def removeSlave(slaveId: String) { + def removeSlave(slaveId: String, reason: String) { logInfo("Slave " + slaveId + " disconnected, so removing it") val numCores = freeCores(slaveId) actorToSlaveId -= slaveActor(slaveId) @@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor freeCores -= slaveId slaveHost -= slaveId totalCoreCount.addAndGet(-numCores) - scheduler.slaveLost(slaveId, SlaveLost()) + scheduler.slaveLost(slaveId, SlaveLost(reason)) } } -- cgit v1.2.3 From 1948f46093d2934284daeae06cc2891541c39e68 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 14 Dec 2012 01:19:00 +0000 Subject: Use spark-env.sh to configure standalone master. See SPARK-638. Also fixed a typo in the standalone mode documentation. --- bin/start-all.sh | 4 ++-- bin/start-master.sh | 19 +++++++++++++++++-- bin/start-slave.sh | 1 - docs/spark-standalone.md | 2 +- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/bin/start-all.sh b/bin/start-all.sh index 9bd6c50654..b9891ad2f6 100755 --- a/bin/start-all.sh +++ b/bin/start-all.sh @@ -11,7 +11,7 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" # Start Master -"$bin"/start-master.sh --config $SPARK_CONF_DIR +"$bin"/start-master.sh # Start Workers -"$bin"/start-slaves.sh --config $SPARK_CONF_DIR \ No newline at end of file +"$bin"/start-slaves.sh diff --git a/bin/start-master.sh b/bin/start-master.sh index ad19d48331..a901b1c260 100755 --- a/bin/start-master.sh +++ b/bin/start-master.sh @@ -7,13 +7,28 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" +if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then + . "${SPARK_CONF_DIR}/spark-env.sh" +fi + +if [ "$SPARK_MASTER_PORT" = "" ]; then + SPARK_MASTER_PORT=7077 +fi + +if [ "$SPARK_MASTER_IP" = "" ]; then + SPARK_MASTER_IP=`hostname` +fi + +if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then + SPARK_MASTER_WEBUI_PORT=8080 +fi + # Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves if [ "$SPARK_PUBLIC_DNS" = "" ]; then # If we appear to be running on EC2, use the public address by default: if [[ `hostname` == *ec2.internal ]]; then - echo "RUNNING ON EC2" export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname` fi fi -"$bin"/spark-daemon.sh start spark.deploy.master.Master +"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT diff --git a/bin/start-slave.sh b/bin/start-slave.sh index 10cce9c17b..45a0cf7a6b 100755 --- a/bin/start-slave.sh +++ b/bin/start-slave.sh @@ -7,7 +7,6 @@ bin=`cd "$bin"; pwd` if [ "$SPARK_PUBLIC_DNS" = "" ]; then # If we appear to be running on EC2, use the public address by default: if [[ `hostname` == *ec2.internal ]]; then - echo "RUNNING ON EC2" export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname` fi fi diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ae630a0371..e0ba7c35cb 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -68,7 +68,7 @@ Finally, the following configuration options can be passed to the master and wor To launch a Spark standalone cluster with the deploy scripts, you need to set up two files, `conf/spark-env.sh` and `conf/slaves`. The `conf/spark-env.sh` file lets you specify global settings for the master and slave instances, such as memory, or port numbers to bind to, while `conf/slaves` is a list of slave nodes. The system requires that all the slave machines have the same configuration files, so *copy these files to each machine*. -In `conf/spark-env.sh`, you can set the following parameters, in addition to the [standard Spark configuration settongs](configuration.html): +In `conf/spark-env.sh`, you can set the following parameters, in addition to the [standard Spark configuration settings](configuration.html): -- cgit v1.2.3 From 24d7aa2d150ec7e20d4527c4223df183be8bb330 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 13 Dec 2012 18:39:23 -0800 Subject: Extra whitespace in ExecutorExitCode --- core/src/main/scala/spark/executor/ExecutorExitCode.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala index 7fdc3b1d34..fd76029cb3 100644 --- a/core/src/main/scala/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/spark/executor/ExecutorExitCode.scala @@ -12,12 +12,15 @@ private[spark] object ExecutorExitCode { /** The default uncaught exception handler was reached. */ val UNCAUGHT_EXCEPTION = 50 + /** The default uncaught exception handler was called and an exception was encountered while logging the exception. */ val UNCAUGHT_EXCEPTION_TWICE = 51 + /** The default uncaught exception handler was reached, and the uncaught exception was an OutOfMemoryError. */ val OOM = 52 + /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 -- cgit v1.2.3 From b054d3b222e34792dbc9e40f14b4c04043b892e3 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 13 Dec 2012 18:44:07 -0800 Subject: ExecutorLostReason -> ExecutorLossReason --- .../spark/scheduler/cluster/ClusterScheduler.scala | 2 +- .../scheduler/cluster/ExecutorLossReason.scala | 21 +++++++++++++++++++++ .../scheduler/cluster/ExecutorLostReason.scala | 21 --------------------- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../scheduler/mesos/MesosSchedulerBackend.scala | 2 +- 5 files changed, 24 insertions(+), 24 deletions(-) create mode 100644 core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala delete mode 100644 core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index ab200decb1..20f6e65020 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def slaveLost(slaveId: String, reason: ExecutorLostReason) { + def slaveLost(slaveId: String, reason: ExecutorLossReason) { var failedHost: Option[String] = None synchronized { val host = slaveIdToHost(slaveId) diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala new file mode 100644 index 0000000000..bba7de6a65 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala @@ -0,0 +1,21 @@ +package spark.scheduler.cluster + +import spark.executor.ExecutorExitCode + +/** + * Represents an explanation for a executor or whole slave failing or exiting. + */ +private[spark] +class ExecutorLossReason(val message: String) { + override def toString: String = message +} + +private[spark] +case class ExecutorExited(val exitCode: Int) + extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) { +} + +private[spark] +case class SlaveLost(_message: String = "Slave lost") + extends ExecutorLossReason(_message) { +} diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala deleted file mode 100644 index 8976b3969d..0000000000 --- a/core/src/main/scala/spark/scheduler/cluster/ExecutorLostReason.scala +++ /dev/null @@ -1,21 +0,0 @@ -package spark.scheduler.cluster - -import spark.executor.ExecutorExitCode - -/** - * Represents an explanation for a executor or whole slave failing or exiting. - */ -private[spark] -class ExecutorLostReason(val message: String) { - override def toString: String = message -} - -private[spark] -case class ExecutorExited(val exitCode: Int) - extends ExecutorLostReason(ExecutorExitCode.explainExitCode(exitCode)) { -} - -private[spark] -case class SlaveLost(_message: String = "Slave lost") - extends ExecutorLostReason(_message) { -} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index f505628753..efaf2d330c 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -72,7 +72,7 @@ private[spark] class SparkDeploySchedulerBackend( } def executorRemoved(id: String, message: String) { - var reason: ExecutorLostReason = SlaveLost(message) + var reason: ExecutorLossReason = SlaveLost(message) if (message.startsWith("Command exited with code ")) { try { reason = ExecutorExited(message.substring("Command exited with code ".length).toInt) diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index b0d4315f05..8c7a1dfbc0 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -267,7 +267,7 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLostReason) { + private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { slaveIdsWithExecutors -= slaveId.getValue -- cgit v1.2.3
Environment VariableMeaning