aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-13 22:06:47 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-13 22:06:47 -0800
commit41e58a519ac107c118715b8b9701032dd3d9a2d2 (patch)
treec491c889e81aa50bbab6de879c349b09ce8d120d
parentdc7d7fc28656948d94000e6532f1437d597cd4db (diff)
parent05e225f988796d05efbd5f7bf7303bbefebf54f0 (diff)
downloadspark-41e58a519ac107c118715b8b9701032dd3d9a2d2.tar.gz
spark-41e58a519ac107c118715b8b9701032dd3d9a2d2.tar.bz2
spark-41e58a519ac107c118715b8b9701032dd3d9a2d2.zip
Merge branch 'master' of github.com:mesos/spark into spark-633
-rwxr-xr-xbin/start-all.sh4
-rwxr-xr-xbin/start-master.sh19
-rwxr-xr-xbin/start-slave.sh1
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala9
-rw-r--r--core/src/main/scala/spark/executor/ExecutorExitCode.scala43
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala16
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala4
-rw-r--r--docs/spark-standalone.md2
12 files changed, 138 insertions, 21 deletions
diff --git a/bin/start-all.sh b/bin/start-all.sh
index 9bd6c50654..b9891ad2f6 100755
--- a/bin/start-all.sh
+++ b/bin/start-all.sh
@@ -11,7 +11,7 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
# Start Master
-"$bin"/start-master.sh --config $SPARK_CONF_DIR
+"$bin"/start-master.sh
# Start Workers
-"$bin"/start-slaves.sh --config $SPARK_CONF_DIR \ No newline at end of file
+"$bin"/start-slaves.sh
diff --git a/bin/start-master.sh b/bin/start-master.sh
index ad19d48331..a901b1c260 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -7,13 +7,28 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
+if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
+ . "${SPARK_CONF_DIR}/spark-env.sh"
+fi
+
+if [ "$SPARK_MASTER_PORT" = "" ]; then
+ SPARK_MASTER_PORT=7077
+fi
+
+if [ "$SPARK_MASTER_IP" = "" ]; then
+ SPARK_MASTER_IP=`hostname`
+fi
+
+if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
+ SPARK_MASTER_WEBUI_PORT=8080
+fi
+
# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
- echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
-"$bin"/spark-daemon.sh start spark.deploy.master.Master
+"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
diff --git a/bin/start-slave.sh b/bin/start-slave.sh
index 10cce9c17b..45a0cf7a6b 100755
--- a/bin/start-slave.sh
+++ b/bin/start-slave.sh
@@ -7,7 +7,6 @@ bin=`cd "$bin"; pwd`
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
- echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index cb29a6b8b4..2552958d27 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -50,9 +50,14 @@ private[spark] class Executor extends Logging {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
- System.exit(1)
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+ }
} catch {
- case t: Throwable => System.exit(2)
+ case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM)
+ case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala
new file mode 100644
index 0000000000..fd76029cb3
--- /dev/null
+++ b/core/src/main/scala/spark/executor/ExecutorExitCode.scala
@@ -0,0 +1,43 @@
+package spark.executor
+
+/**
+ * These are exit codes that executors should use to provide the master with information about
+ * executor failures assuming that cluster management framework can capture the exit codes (but
+ * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict
+ * with "natural" exit statuses that may be caused by the JVM or user code. In particular,
+ * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the
+ * OpenJDK JVM may use exit code 1 in some of its own "last chance" code.
+ */
+private[spark]
+object ExecutorExitCode {
+ /** The default uncaught exception handler was reached. */
+ val UNCAUGHT_EXCEPTION = 50
+
+ /** The default uncaught exception handler was called and an exception was encountered while
+ logging the exception. */
+ val UNCAUGHT_EXCEPTION_TWICE = 51
+
+ /** The default uncaught exception handler was reached, and the uncaught exception was an
+ OutOfMemoryError. */
+ val OOM = 52
+
+ /** DiskStore failed to create a local temporary directory after many attempts. */
+ val DISK_STORE_FAILED_TO_CREATE_DIR = 53
+
+ def explainExitCode(exitCode: Int): String = {
+ exitCode match {
+ case UNCAUGHT_EXCEPTION => "Uncaught exception"
+ case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
+ case OOM => "OutOfMemoryError"
+ case DISK_STORE_FAILED_TO_CREATE_DIR =>
+ "Failed to create local directory (bad spark.local.dir?)"
+ case _ =>
+ "Unknown executor exit code (" + exitCode + ")" + (
+ if (exitCode > 128)
+ " (died from signal " + (exitCode - 128) + "?)"
+ else
+ ""
+ )
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index f5e852d203..20f6e65020 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,15 +249,22 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def slaveLost(slaveId: String) {
+ def slaveLost(slaveId: String, reason: ExecutorLossReason) {
var failedHost: Option[String] = None
synchronized {
val host = slaveIdToHost(slaveId)
if (hostsAlive.contains(host)) {
+ logError("Lost an executor on " + host + ": " + reason)
slaveIdsWithExecutors -= slaveId
hostsAlive -= host
activeTaskSetsQueue.foreach(_.hostLost(host))
failedHost = Some(host)
+ } else {
+ // We may get multiple slaveLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor on " + host + " (already removed): " + reason)
}
}
if (failedHost != None) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
new file mode 100644
index 0000000000..bba7de6a65
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
@@ -0,0 +1,21 @@
+package spark.scheduler.cluster
+
+import spark.executor.ExecutorExitCode
+
+/**
+ * Represents an explanation for a executor or whole slave failing or exiting.
+ */
+private[spark]
+class ExecutorLossReason(val message: String) {
+ override def toString: String = message
+}
+
+private[spark]
+case class ExecutorExited(val exitCode: Int)
+ extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+}
+
+private[spark]
+case class SlaveLost(_message: String = "Slave lost")
+ extends ExecutorLossReason(_message) {
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7aba7324ab..efaf2d330c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+ val executorIdToSlaveId = new HashMap[String, String]
// Memory used by each executor (in megabytes)
val executorMemory = {
@@ -65,9 +66,27 @@ private[spark] class SparkDeploySchedulerBackend(
}
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
+ executorIdToSlaveId += id -> workerId
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
id, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(id: String, message: String) {}
+ def executorRemoved(id: String, message: String) {
+ var reason: ExecutorLossReason = SlaveLost(message)
+ if (message.startsWith("Command exited with code ")) {
+ try {
+ reason = ExecutorExited(message.substring("Command exited with code ".length).toInt)
+ } catch {
+ case nfe: NumberFormatException => {}
+ }
+ }
+ logInfo("Executor %s removed: %s".format(id, message))
+ executorIdToSlaveId.get(id) match {
+ case Some(slaveId) =>
+ executorIdToSlaveId.remove(id)
+ scheduler.slaveLost(slaveId, reason)
+ case None =>
+ logInfo("No slave ID known for executor %s".format(id))
+ }
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d2cce0dc05..eeaae23dc8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -69,13 +69,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self)
case Terminated(actor) =>
- actorToSlaveId.get(actor).foreach(removeSlave)
+ actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave)
+ addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave)
+ addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all slaves
@@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
// Remove a disconnected slave from the cluster
- def removeSlave(slaveId: String) {
+ def removeSlave(slaveId: String, reason: String) {
logInfo("Slave " + slaveId + " disconnected, so removing it")
val numCores = freeCores(slaveId)
actorToSlaveId -= slaveActor(slaveId)
@@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
freeCores -= slaveId
slaveHost -= slaveId
totalCoreCount.addAndGet(-numCores)
- scheduler.slaveLost(slaveId)
+ scheduler.slaveLost(slaveId, SlaveLost(reason))
}
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 814443fa52..8c7a1dfbc0 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -267,17 +267,23 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
- scheduler.slaveLost(slaveId.getValue)
+ scheduler.slaveLost(slaveId.getValue, reason)
+ }
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ recordSlaveLost(d, slaveId, SlaveLost())
}
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
+ override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
+ slaveId: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+ slaveId.getValue))
+ recordSlaveLost(d, slaveId, ExecutorExited(status))
}
// TODO: query Mesos for number of cores
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 8ba64e4b76..b5561479db 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import scala.collection.mutable.ArrayBuffer
+import spark.executor.ExecutorExitCode
+
import spark.Utils
/**
@@ -162,7 +164,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in " + rootDir)
- System.exit(1)
+ System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created local directory at " + localDir)
localDir
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index ae630a0371..e0ba7c35cb 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -68,7 +68,7 @@ Finally, the following configuration options can be passed to the master and wor
To launch a Spark standalone cluster with the deploy scripts, you need to set up two files, `conf/spark-env.sh` and `conf/slaves`. The `conf/spark-env.sh` file lets you specify global settings for the master and slave instances, such as memory, or port numbers to bind to, while `conf/slaves` is a list of slave nodes. The system requires that all the slave machines have the same configuration files, so *copy these files to each machine*.
-In `conf/spark-env.sh`, you can set the following parameters, in addition to the [standard Spark configuration settongs](configuration.html):
+In `conf/spark-env.sh`, you can set the following parameters, in addition to the [standard Spark configuration settings](configuration.html):
<table class="table">
<tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>