aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-04-28 13:31:08 -0700
committerAndrew Or <andrew@databricks.com>2015-04-28 13:33:57 -0700
commit53befacced828bbac53c6e3a4976ec3f036bae9e (patch)
treeedd8acc9ff5b60698dd579fd07b35ae977d83cfe
parent80098109d908b738b43d397e024756ff617d0af4 (diff)
downloadspark-53befacced828bbac53c6e3a4976ec3f036bae9e.tar.gz
spark-53befacced828bbac53c6e3a4976ec3f036bae9e.tar.bz2
spark-53befacced828bbac53c6e3a4976ec3f036bae9e.zip
[SPARK-5338] [MESOS] Add cluster mode support for Mesos
This patch adds the support for cluster mode to run on Mesos. It introduces a new Mesos framework dedicated to launch new apps/drivers, and can be called with the spark-submit script and specifying --master flag to the cluster mode REST interface instead of Mesos master. Example: ./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master mesos://10.0.0.206:8077 --executor-memory 1G --total-executor-cores 100 examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar 30 Part of this patch is also to abstract the StandaloneRestServer so it can have different implementations of the REST endpoints. Features of the cluster mode in this PR: - Supports supervise mode where scheduler will keep trying to reschedule exited job. - Adds a new UI for the cluster mode scheduler to see all the running jobs, finished jobs, and supervise jobs waiting to be retried - Supports state persistence to ZK, so when the cluster scheduler fails over it can pick up all the queued and running jobs Author: Timothy Chen <tnachen@gmail.com> Author: Luc Bourlier <luc.bourlier@typesafe.com> Closes #5144 from tnachen/mesos_cluster_mode and squashes the following commits: 069e946 [Timothy Chen] Fix rebase. e24b512 [Timothy Chen] Persist submitted driver. 390c491 [Timothy Chen] Fix zk conf key for mesos zk engine. e324ac1 [Timothy Chen] Fix merge. fd5259d [Timothy Chen] Address review comments. 1553230 [Timothy Chen] Address review comments. c6c6b73 [Timothy Chen] Pass spark properties to mesos cluster tasks. f7d8046 [Timothy Chen] Change app name to spark cluster. 17f93a2 [Timothy Chen] Fix head of line blocking in scheduling drivers. 6ff8e5c [Timothy Chen] Address comments and add logging. df355cd [Timothy Chen] Add metrics to mesos cluster scheduler. 20f7284 [Timothy Chen] Address review comments 7252612 [Timothy Chen] Fix tests. a46ad66 [Timothy Chen] Allow zk cli param override. 920fc4b [Timothy Chen] Fix scala style issues. 862b5b5 [Timothy Chen] Support asking driver status when it's retrying. 7f214c2 [Timothy Chen] Fix RetryState visibility e0f33f7 [Timothy Chen] Add supervise support and persist retries. 371ce65 [Timothy Chen] Handle cluster mode recovery and state persistence. 3d4dfa1 [Luc Bourlier] Adds support to kill submissions febfaba [Timothy Chen] Bound the finished drivers in memory 543a98d [Timothy Chen] Schedule multiple jobs 6887e5e [Timothy Chen] Support looking at SPARK_EXECUTOR_URI env variable in schedulers 8ec76bc [Timothy Chen] Fix Mesos dispatcher UI. d57d77d [Timothy Chen] Add documentation 825afa0 [Luc Bourlier] Supports more spark-submit parameters b8e7181 [Luc Bourlier] Adds a shutdown latch to keep the deamon running 0fa7780 [Luc Bourlier] Launch task through the mesos scheduler 5b7a12b [Timothy Chen] WIP: Making a cluster mode a mesos framework. 4b2f5ef [Timothy Chen] Specify user jar in command to be replaced with local. e775001 [Timothy Chen] Support fetching remote uris in driver runner. 7179495 [Timothy Chen] Change Driver page output and add logging 880bc27 [Timothy Chen] Add Mesos Cluster UI to display driver results 9986731 [Timothy Chen] Kill drivers when shutdown 67cbc18 [Timothy Chen] Rename StandaloneRestClient to RestClient and add sbin scripts e3facdd [Timothy Chen] Add Mesos Cluster dispatcher
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala (renamed from core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala)10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala116
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala101
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala65
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala (renamed from core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala)35
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala318
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala344
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala158
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala134
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala608
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala95
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala76
-rw-r--r--docs/running-on-mesos.md23
-rwxr-xr-xsbin/start-mesos-dispatcher.sh40
-rwxr-xr-xsbin/stop-mesos-dispatcher.sh27
30 files changed, 2147 insertions, 493 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index a7c89276a0..c048b78910 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.apache.spark.{Logging, SparkConf, SparkContext}
-import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
+import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.util.Utils
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
index 5b22481ea8..b8d3993540 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.deploy.master
+package org.apache.spark.deploy
import scala.collection.JavaConversions._
@@ -25,15 +25,17 @@ import org.apache.zookeeper.KeeperException
import org.apache.spark.{Logging, SparkConf}
-private[deploy] object SparkCuratorUtil extends Logging {
+private[spark] object SparkCuratorUtil extends Logging {
private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
private val ZK_SESSION_TIMEOUT_MILLIS = 60000
private val RETRY_WAIT_MILLIS = 5000
private val MAX_RECONNECT_ATTEMPTS = 3
- def newClient(conf: SparkConf): CuratorFramework = {
- val ZK_URL = conf.get("spark.deploy.zookeeper.url")
+ def newClient(
+ conf: SparkConf,
+ zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
+ val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 296a0764b8..f4f572e1e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -36,11 +36,11 @@ import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
-
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
+
/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone cluster mode.
@@ -114,18 +114,20 @@ object SparkSubmit {
}
}
- /** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
+ /**
+ * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
+ */
private def kill(args: SparkSubmitArguments): Unit = {
- new StandaloneRestClient()
+ new RestSubmissionClient()
.killSubmission(args.master, args.submissionToKill)
}
/**
* Request the status of an existing submission using the REST protocol.
- * Standalone cluster mode only.
+ * Standalone and Mesos cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
- new StandaloneRestClient()
+ new RestSubmissionClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
}
@@ -252,6 +254,7 @@ object SparkSubmit {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+ val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
@@ -294,8 +297,9 @@ object SparkSubmit {
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
- case (MESOS, CLUSTER) =>
- printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+ case (MESOS, CLUSTER) if args.isPython =>
+ printErrorAndExit("Cluster deploy mode is currently not supported for python " +
+ "applications on Mesos clusters.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
@@ -377,15 +381,6 @@ object SparkSubmit {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),
- // Standalone cluster only
- // Do not set CL arguments here because there are multiple possibilities for the main class
- OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
- OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
- OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
- OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
- OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
- sysProp = "spark.driver.supervise"),
-
// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
@@ -413,7 +408,15 @@ object SparkSubmit {
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
- sysProp = "spark.files")
+ sysProp = "spark.files"),
+ OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
+ OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
+ sysProp = "spark.driver.memory"),
+ OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
+ sysProp = "spark.driver.cores"),
+ OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
+ sysProp = "spark.driver.supervise"),
+ OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)
// In client mode, launch the application main class directly
@@ -452,7 +455,7 @@ object SparkSubmit {
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
- childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
+ childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
@@ -496,6 +499,15 @@ object SparkSubmit {
}
}
+ if (isMesosCluster) {
+ assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
+ childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
+ childArgs += (args.primaryResource, args.mainClass)
+ if (args.childArgs != null) {
+ childArgs ++= args.childArgs
+ }
+ }
+
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c896842943..c621b8fc86 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -241,8 +241,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
private def validateKillArguments(): Unit = {
- if (!master.startsWith("spark://")) {
- SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
+ if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
+ SparkSubmit.printErrorAndExit(
+ "Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
@@ -250,9 +251,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
private def validateStatusRequestArguments(): Unit = {
- if (!master.startsWith("spark://")) {
+ if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
- "Requesting submission statuses is only supported in standalone mode!")
+ "Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
@@ -485,6 +486,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
+ |
+ | Spark standalone or Mesos with cluster deploy mode only:
| --supervise If given, restarts the driver on failure.
| --kill SUBMISSION_ID If given, kills the driver specified.
| --status SUBMISSION_ID If given, requests the status of the driver specified.
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ff2eed6dee..1c21c17956 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -130,7 +130,7 @@ private[master] class Master(
private val restServer =
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
- Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
+ Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 4823fd7cac..52758d6a7c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -23,6 +23,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
+import org.apache.spark.deploy.SparkCuratorUtil
private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index a285783f72..80db6d474b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkCuratorUtil
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
new file mode 100644
index 0000000000..5d4e5b899d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.deploy.mesos.ui.MesosClusterUI
+import org.apache.spark.deploy.rest.mesos.MesosRestServer
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.util.SignalLogger
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
+/*
+ * A dispatcher that is responsible for managing and launching drivers, and is intended to be
+ * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
+ * the cluster independently of Spark applications.
+ * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
+ * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
+ * for resources.
+ *
+ * A typical new driver lifecycle is the following:
+ * - Driver submitted via spark-submit talking to the [[MesosRestServer]]
+ * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
+ * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
+ *
+ * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
+ * per driver launched.
+ * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
+ * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
+ * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
+ */
+private[mesos] class MesosClusterDispatcher(
+ args: MesosClusterDispatcherArguments,
+ conf: SparkConf)
+ extends Logging {
+
+ private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
+ private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
+ logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
+
+ private val engineFactory = recoveryMode match {
+ case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
+ case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
+ case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
+ }
+
+ private val scheduler = new MesosClusterScheduler(engineFactory, conf)
+
+ private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
+ private val webUi = new MesosClusterUI(
+ new SecurityManager(conf),
+ args.webUiPort,
+ conf,
+ publicAddress,
+ scheduler)
+
+ private val shutdownLatch = new CountDownLatch(1)
+
+ def start(): Unit = {
+ webUi.bind()
+ scheduler.frameworkUrl = webUi.activeWebUiUrl
+ scheduler.start()
+ server.start()
+ }
+
+ def awaitShutdown(): Unit = {
+ shutdownLatch.await()
+ }
+
+ def stop(): Unit = {
+ webUi.stop()
+ server.stop()
+ scheduler.stop()
+ shutdownLatch.countDown()
+ }
+}
+
+private[mesos] object MesosClusterDispatcher extends Logging {
+ def main(args: Array[String]) {
+ SignalLogger.register(log)
+ val conf = new SparkConf
+ val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
+ conf.setMaster(dispatcherArgs.masterUrl)
+ conf.setAppName(dispatcherArgs.name)
+ dispatcherArgs.zookeeperUrl.foreach { z =>
+ conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
+ conf.set("spark.mesos.deploy.zookeeper.url", z)
+ }
+ val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
+ dispatcher.start()
+ val shutdownHook = new Thread() {
+ override def run() {
+ logInfo("Shutdown hook is shutting down dispatcher")
+ dispatcher.stop()
+ dispatcher.awaitShutdown()
+ }
+ }
+ Runtime.getRuntime.addShutdownHook(shutdownHook)
+ dispatcher.awaitShutdown()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
new file mode 100644
index 0000000000..894cb78d85
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{IntParam, Utils}
+
+
+private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
+ var host = Utils.localHostName()
+ var port = 7077
+ var name = "Spark Cluster"
+ var webUiPort = 8081
+ var masterUrl: String = _
+ var zookeeperUrl: Option[String] = None
+ var propertiesFile: String = _
+
+ parse(args.toList)
+
+ propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
+ private def parse(args: List[String]): Unit = args match {
+ case ("--host" | "-h") :: value :: tail =>
+ Utils.checkHost(value, "Please use hostname " + value)
+ host = value
+ parse(tail)
+
+ case ("--port" | "-p") :: IntParam(value) :: tail =>
+ port = value
+ parse(tail)
+
+ case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
+ webUiPort = value
+ parse(tail)
+
+ case ("--zk" | "-z") :: value :: tail =>
+ zookeeperUrl = Some(value)
+ parse(tail)
+
+ case ("--master" | "-m") :: value :: tail =>
+ if (!value.startsWith("mesos://")) {
+ System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
+ System.exit(1)
+ }
+ masterUrl = value.stripPrefix("mesos://")
+ parse(tail)
+
+ case ("--name") :: value :: tail =>
+ name = value
+ parse(tail)
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parse(tail)
+
+ case ("--help") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil => {
+ if (masterUrl == null) {
+ System.err.println("--master is required")
+ printUsageAndExit(1)
+ }
+ }
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+
+ private def printUsageAndExit(exitCode: Int): Unit = {
+ System.err.println(
+ "Usage: MesosClusterDispatcher [options]\n" +
+ "\n" +
+ "Options:\n" +
+ " -h HOST, --host HOST Hostname to listen on\n" +
+ " -p PORT, --port PORT Port to listen on (default: 7077)\n" +
+ " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
+ " --name NAME Framework name to show in Mesos UI\n" +
+ " -m --master MASTER URI for connecting to Mesos master\n" +
+ " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" +
+ " Zookeeper for persistence\n" +
+ " --properties-file FILE Path to a custom Spark properties file.\n" +
+ " Default is conf/spark-defaults.conf.")
+ System.exit(exitCode)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
new file mode 100644
index 0000000000..1948226800
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.Date
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
+
+/**
+ * Describes a Spark driver that is submitted from the
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * @param jarUrl URL to the application jar
+ * @param mem Amount of memory for the driver
+ * @param cores Number of cores for the driver
+ * @param supervise Supervise the driver for long running app
+ * @param command The command to launch the driver.
+ * @param schedulerProperties Extra properties to pass the Mesos scheduler
+ */
+private[spark] class MesosDriverDescription(
+ val name: String,
+ val jarUrl: String,
+ val mem: Int,
+ val cores: Double,
+ val supervise: Boolean,
+ val command: Command,
+ val schedulerProperties: Map[String, String],
+ val submissionId: String,
+ val submissionDate: Date,
+ val retryState: Option[MesosClusterRetryState] = None)
+ extends Serializable {
+
+ def copy(
+ name: String = name,
+ jarUrl: String = jarUrl,
+ mem: Int = mem,
+ cores: Double = cores,
+ supervise: Boolean = supervise,
+ command: Command = command,
+ schedulerProperties: Map[String, String] = schedulerProperties,
+ submissionId: String = submissionId,
+ submissionDate: Date = submissionDate,
+ retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
+ new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
+ submissionId, submissionDate, retryState)
+ }
+
+ override def toString: String = s"MesosDriverDescription (${command.mainClass})"
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
new file mode 100644
index 0000000000..7b2005e0f1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.mesos.Protos.TaskStatus
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val state = parent.scheduler.getSchedulerState()
+ val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
+ val driverHeaders = queuedHeaders ++
+ Seq("Start Date", "Mesos Slave ID", "State")
+ val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
+ Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
+ val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
+ val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
+ val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
+ val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers)
+ val content =
+ <p>Mesos Framework ID: {state.frameworkId}</p>
+ <div class="row-fluid">
+ <div class="span12">
+ <h4>Queued Drivers:</h4>
+ {queuedTable}
+ <h4>Launched Drivers:</h4>
+ {launchedTable}
+ <h4>Finished Drivers:</h4>
+ {finishedTable}
+ <h4>Supervise drivers waiting for retry:</h4>
+ {retryTable}
+ </div>
+ </div>;
+ UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
+ }
+
+ private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
+ <tr>
+ <td>{submission.submissionId}</td>
+ <td>{submission.submissionDate}</td>
+ <td>{submission.command.mainClass}</td>
+ <td>cpus: {submission.cores}, mem: {submission.mem}</td>
+ </tr>
+ }
+
+ private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
+ <tr>
+ <td>{state.driverDescription.submissionId}</td>
+ <td>{state.driverDescription.submissionDate}</td>
+ <td>{state.driverDescription.command.mainClass}</td>
+ <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
+ <td>{state.startDate}</td>
+ <td>{state.slaveId.getValue}</td>
+ <td>{stateString(state.mesosTaskStatus)}</td>
+ </tr>
+ }
+
+ private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
+ <tr>
+ <td>{submission.submissionId}</td>
+ <td>{submission.submissionDate}</td>
+ <td>{submission.command.mainClass}</td>
+ <td>{submission.retryState.get.lastFailureStatus}</td>
+ <td>{submission.retryState.get.nextRetry}</td>
+ <td>{submission.retryState.get.retries}</td>
+ </tr>
+ }
+
+ private def stateString(status: Option[TaskStatus]): String = {
+ if (status.isEmpty) {
+ return ""
+ }
+ val sb = new StringBuilder
+ val s = status.get
+ sb.append(s"State: ${s.getState}")
+ if (status.get.hasMessage) {
+ sb.append(s", Message: ${s.getMessage}")
+ }
+ if (status.get.hasHealthy) {
+ sb.append(s", Healthy: ${s.getHealthy}")
+ }
+ if (status.get.hasSource) {
+ sb.append(s", Source: ${s.getSource}")
+ }
+ if (status.get.hasReason) {
+ sb.append(s", Reason: ${s.getReason}")
+ }
+ if (status.get.hasTimestamp) {
+ sb.append(s", Time: ${s.getTimestamp}")
+ }
+ sb.toString()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
new file mode 100644
index 0000000000..4865d46dbc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.ui.{SparkUI, WebUI}
+
+/**
+ * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
+ */
+private[spark] class MesosClusterUI(
+ securityManager: SecurityManager,
+ port: Int,
+ conf: SparkConf,
+ dispatcherPublicAddress: String,
+ val scheduler: MesosClusterScheduler)
+ extends WebUI(securityManager, port, conf) {
+
+ initialize()
+
+ def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort
+
+ override def initialize() {
+ attachPage(new MesosClusterPage(this))
+ attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
+ }
+}
+
+private object MesosClusterUI {
+ val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index b8fd406fb6..307cebfb4b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -30,9 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils
/**
- * A client that submits applications to the standalone Master using a REST protocol.
- * This client is intended to communicate with the [[StandaloneRestServer]] and is
- * currently used for cluster mode only.
+ * A client that submits applications to a [[RestSubmissionServer]].
*
* In protocol version v1, the REST URL takes the form http://[host:port]/v1/submissions/[action],
* where [action] can be one of create, kill, or status. Each type of request is represented in
@@ -53,8 +51,10 @@ import org.apache.spark.util.Utils
* implementation of this client can use that information to retry using the version specified
* by the server.
*/
-private[deploy] class StandaloneRestClient extends Logging {
- import StandaloneRestClient._
+private[spark] class RestSubmissionClient extends Logging {
+ import RestSubmissionClient._
+
+ private val supportedMasterPrefixes = Seq("spark://", "mesos://")
/**
* Submit an application specified by the parameters in the provided request.
@@ -62,7 +62,7 @@ private[deploy] class StandaloneRestClient extends Logging {
* If the submission was successful, poll the status of the submission and report
* it to the user. Otherwise, report the error message provided by the server.
*/
- private[rest] def createSubmission(
+ def createSubmission(
master: String,
request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to launch an application in $master.")
@@ -107,7 +107,7 @@ private[deploy] class StandaloneRestClient extends Logging {
}
/** Construct a message that captures the specified parameters for submitting an application. */
- private[rest] def constructSubmitRequest(
+ def constructSubmitRequest(
appResource: String,
mainClass: String,
appArgs: Array[String],
@@ -219,14 +219,23 @@ private[deploy] class StandaloneRestClient extends Logging {
/** Return the base URL for communicating with the server, including the protocol version. */
private def getBaseUrl(master: String): String = {
- val masterUrl = master.stripPrefix("spark://").stripSuffix("/")
+ var masterUrl = master
+ supportedMasterPrefixes.foreach { prefix =>
+ if (master.startsWith(prefix)) {
+ masterUrl = master.stripPrefix(prefix)
+ }
+ }
+ masterUrl = masterUrl.stripSuffix("/")
s"http://$masterUrl/$PROTOCOL_VERSION/submissions"
}
/** Throw an exception if this is not standalone mode. */
private def validateMaster(master: String): Unit = {
- if (!master.startsWith("spark://")) {
- throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
+ val valid = supportedMasterPrefixes.exists { prefix => master.startsWith(prefix) }
+ if (!valid) {
+ throw new IllegalArgumentException(
+ "This REST client only supports master URLs that start with " +
+ "one of the following: " + supportedMasterPrefixes.mkString(","))
}
}
@@ -295,7 +304,7 @@ private[deploy] class StandaloneRestClient extends Logging {
}
}
-private[rest] object StandaloneRestClient {
+private[spark] object RestSubmissionClient {
private val REPORT_DRIVER_STATUS_INTERVAL = 1000
private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
val PROTOCOL_VERSION = "v1"
@@ -315,7 +324,7 @@ private[rest] object StandaloneRestClient {
}
val sparkProperties = conf.getAll.toMap
val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
- val client = new StandaloneRestClient
+ val client = new RestSubmissionClient
val submitRequest = client.constructSubmitRequest(
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
client.createSubmission(master, submitRequest)
@@ -323,7 +332,7 @@ private[rest] object StandaloneRestClient {
def main(args: Array[String]): Unit = {
if (args.size < 2) {
- sys.error("Usage: StandaloneRestClient [app resource] [main class] [app args*]")
+ sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
sys.exit(1)
}
val appResource = args(0)
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
new file mode 100644
index 0000000000..2e78d03e5c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.net.InetSocketAddress
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+
+import scala.io.Source
+import com.fasterxml.jackson.core.JsonProcessingException
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
+import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.Utils
+
+/**
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
+ *
+ * This server responds with different HTTP codes depending on the situation:
+ * 200 OK - Request was processed successfully
+ * 400 BAD REQUEST - Request was malformed, not successfully validated, or of unexpected type
+ * 468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this server does not understand
+ * 500 INTERNAL SERVER ERROR - Server throws an exception internally while processing the request
+ *
+ * The server always includes a JSON representation of the relevant [[SubmitRestProtocolResponse]]
+ * in the HTTP body. If an error occurs, however, the server will include an [[ErrorResponse]]
+ * instead of the one expected by the client. If the construction of this error response itself
+ * fails, the response will consist of an empty body with a response code that indicates internal
+ * server error.
+ */
+private[spark] abstract class RestSubmissionServer(
+ val host: String,
+ val requestedPort: Int,
+ val masterConf: SparkConf) extends Logging {
+ protected val submitRequestServlet: SubmitRequestServlet
+ protected val killRequestServlet: KillRequestServlet
+ protected val statusRequestServlet: StatusRequestServlet
+
+ private var _server: Option[Server] = None
+
+ // A mapping from URL prefixes to servlets that serve them. Exposed for testing.
+ protected val baseContext = s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
+ protected lazy val contextToServlet = Map[String, RestServlet](
+ s"$baseContext/create/*" -> submitRequestServlet,
+ s"$baseContext/kill/*" -> killRequestServlet,
+ s"$baseContext/status/*" -> statusRequestServlet,
+ "/*" -> new ErrorServlet // default handler
+ )
+
+ /** Start the server and return the bound port. */
+ def start(): Int = {
+ val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
+ _server = Some(server)
+ logInfo(s"Started REST server for submitting applications on port $boundPort")
+ boundPort
+ }
+
+ /**
+ * Map the servlets to their corresponding contexts and attach them to a server.
+ * Return a 2-tuple of the started server and the bound port.
+ */
+ private def doStart(startPort: Int): (Server, Int) = {
+ val server = new Server(new InetSocketAddress(host, startPort))
+ val threadPool = new QueuedThreadPool
+ threadPool.setDaemon(true)
+ server.setThreadPool(threadPool)
+ val mainHandler = new ServletContextHandler
+ mainHandler.setContextPath("/")
+ contextToServlet.foreach { case (prefix, servlet) =>
+ mainHandler.addServlet(new ServletHolder(servlet), prefix)
+ }
+ server.setHandler(mainHandler)
+ server.start()
+ val boundPort = server.getConnectors()(0).getLocalPort
+ (server, boundPort)
+ }
+
+ def stop(): Unit = {
+ _server.foreach(_.stop())
+ }
+}
+
+private[rest] object RestSubmissionServer {
+ val PROTOCOL_VERSION = RestSubmissionClient.PROTOCOL_VERSION
+ val SC_UNKNOWN_PROTOCOL_VERSION = 468
+}
+
+/**
+ * An abstract servlet for handling requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class RestServlet extends HttpServlet with Logging {
+
+ /**
+ * Serialize the given response message to JSON and send it through the response servlet.
+ * This validates the response before sending it to ensure it is properly constructed.
+ */
+ protected def sendResponse(
+ responseMessage: SubmitRestProtocolResponse,
+ responseServlet: HttpServletResponse): Unit = {
+ val message = validateResponse(responseMessage, responseServlet)
+ responseServlet.setContentType("application/json")
+ responseServlet.setCharacterEncoding("utf-8")
+ responseServlet.getWriter.write(message.toJson)
+ }
+
+ /**
+ * Return any fields in the client request message that the server does not know about.
+ *
+ * The mechanism for this is to reconstruct the JSON on the server side and compare the
+ * diff between this JSON and the one generated on the client side. Any fields that are
+ * only in the client JSON are treated as unexpected.
+ */
+ protected def findUnknownFields(
+ requestJson: String,
+ requestMessage: SubmitRestProtocolMessage): Array[String] = {
+ val clientSideJson = parse(requestJson)
+ val serverSideJson = parse(requestMessage.toJson)
+ val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
+ unknown match {
+ case j: JObject => j.obj.map { case (k, _) => k }.toArray
+ case _ => Array.empty[String] // No difference
+ }
+ }
+
+ /** Return a human readable String representation of the exception. */
+ protected def formatException(e: Throwable): String = {
+ val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
+ s"$e\n$stackTraceString"
+ }
+
+ /** Construct an error message to signal the fact that an exception has been thrown. */
+ protected def handleError(message: String): ErrorResponse = {
+ val e = new ErrorResponse
+ e.serverSparkVersion = sparkVersion
+ e.message = message
+ e
+ }
+
+ /**
+ * Parse a submission ID from the relative path, assuming it is the first part of the path.
+ * For instance, we expect the path to take the form /[submission ID]/maybe/something/else.
+ * The returned submission ID cannot be empty. If the path is unexpected, return None.
+ */
+ protected def parseSubmissionId(path: String): Option[String] = {
+ if (path == null || path.isEmpty) {
+ None
+ } else {
+ path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
+ }
+ }
+
+ /**
+ * Validate the response to ensure that it is correctly constructed.
+ *
+ * If it is, simply return the message as is. Otherwise, return an error response instead
+ * to propagate the exception back to the client and set the appropriate error code.
+ */
+ private def validateResponse(
+ responseMessage: SubmitRestProtocolResponse,
+ responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+ try {
+ responseMessage.validate()
+ responseMessage
+ } catch {
+ case e: Exception =>
+ responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
+ handleError("Internal server error: " + formatException(e))
+ }
+ }
+}
+
+/**
+ * A servlet for handling kill requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class KillRequestServlet extends RestServlet {
+
+ /**
+ * If a submission ID is specified in the URL, have the Master kill the corresponding
+ * driver and return an appropriate response to the client. Otherwise, return error.
+ */
+ protected override def doPost(
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ val submissionId = parseSubmissionId(request.getPathInfo)
+ val responseMessage = submissionId.map(handleKill).getOrElse {
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ handleError("Submission ID is missing in kill request.")
+ }
+ sendResponse(responseMessage, response)
+ }
+
+ protected def handleKill(submissionId: String): KillSubmissionResponse
+}
+
+/**
+ * A servlet for handling status requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class StatusRequestServlet extends RestServlet {
+
+ /**
+ * If a submission ID is specified in the URL, request the status of the corresponding
+ * driver from the Master and include it in the response. Otherwise, return error.
+ */
+ protected override def doGet(
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ val submissionId = parseSubmissionId(request.getPathInfo)
+ val responseMessage = submissionId.map(handleStatus).getOrElse {
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ handleError("Submission ID is missing in status request.")
+ }
+ sendResponse(responseMessage, response)
+ }
+
+ protected def handleStatus(submissionId: String): SubmissionStatusResponse
+}
+
+/**
+ * A servlet for handling submit requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class SubmitRequestServlet extends RestServlet {
+
+ /**
+ * Submit an application to the Master with parameters specified in the request.
+ *
+ * The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
+ * If the request is successfully processed, return an appropriate response to the
+ * client indicating so. Otherwise, return error instead.
+ */
+ protected override def doPost(
+ requestServlet: HttpServletRequest,
+ responseServlet: HttpServletResponse): Unit = {
+ val responseMessage =
+ try {
+ val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
+ val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
+ // The response should have already been validated on the client.
+ // In case this is not true, validate it ourselves to avoid potential NPEs.
+ requestMessage.validate()
+ handleSubmit(requestMessageJson, requestMessage, responseServlet)
+ } catch {
+ // The client failed to provide a valid JSON, so this is not our fault
+ case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
+ responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ handleError("Malformed request: " + formatException(e))
+ }
+ sendResponse(responseMessage, responseServlet)
+ }
+
+ protected def handleSubmit(
+ requestMessageJson: String,
+ requestMessage: SubmitRestProtocolMessage,
+ responseServlet: HttpServletResponse): SubmitRestProtocolResponse
+}
+
+/**
+ * A default servlet that handles error cases that are not captured by other servlets.
+ */
+private class ErrorServlet extends RestServlet {
+ private val serverVersion = RestSubmissionServer.PROTOCOL_VERSION
+
+ /** Service a faulty request by returning an appropriate error message to the client. */
+ protected override def service(
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ val path = request.getPathInfo
+ val parts = path.stripPrefix("/").split("/").filter(_.nonEmpty).toList
+ var versionMismatch = false
+ var msg =
+ parts match {
+ case Nil =>
+ // http://host:port/
+ "Missing protocol version."
+ case `serverVersion` :: Nil =>
+ // http://host:port/correct-version
+ "Missing the /submissions prefix."
+ case `serverVersion` :: "submissions" :: tail =>
+ // http://host:port/correct-version/submissions/*
+ "Missing an action: please specify one of /create, /kill, or /status."
+ case unknownVersion :: tail =>
+ // http://host:port/unknown-version/*
+ versionMismatch = true
+ s"Unknown protocol version '$unknownVersion'."
+ case _ =>
+ // never reached
+ s"Malformed path $path."
+ }
+ msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
+ val error = handleError(msg)
+ // If there is a version mismatch, include the highest protocol version that
+ // this server supports in case the client wants to retry with our version
+ if (versionMismatch) {
+ error.highestProtocolVersion = serverVersion
+ response.setStatus(RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
+ } else {
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ }
+ sendResponse(error, response)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 2d6b8d4204..502b9bb701 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -18,26 +18,16 @@
package org.apache.spark.deploy.rest
import java.io.File
-import java.net.InetSocketAddress
-import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-
-import scala.io.Source
+import javax.servlet.http.HttpServletResponse
import akka.actor.ActorRef
-import com.fasterxml.jackson.core.JsonProcessingException
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
-import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
+import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
+import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
/**
- * A server that responds to requests submitted by the [[StandaloneRestClient]].
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
* This is intended to be embedded in the standalone Master and used in cluster mode only.
*
* This server responds with different HTTP codes depending on the situation:
@@ -54,173 +44,31 @@ import org.apache.spark.deploy.ClientArguments._
*
* @param host the address this server should bind to
* @param requestedPort the port this server will attempt to bind to
+ * @param masterConf the conf used by the Master
* @param masterActor reference to the Master actor to which requests can be sent
* @param masterUrl the URL of the Master new drivers will attempt to connect to
- * @param masterConf the conf used by the Master
*/
private[deploy] class StandaloneRestServer(
host: String,
requestedPort: Int,
+ masterConf: SparkConf,
masterActor: ActorRef,
- masterUrl: String,
- masterConf: SparkConf)
- extends Logging {
-
- import StandaloneRestServer._
-
- private var _server: Option[Server] = None
-
- // A mapping from URL prefixes to servlets that serve them. Exposed for testing.
- protected val baseContext = s"/$PROTOCOL_VERSION/submissions"
- protected val contextToServlet = Map[String, StandaloneRestServlet](
- s"$baseContext/create/*" -> new SubmitRequestServlet(masterActor, masterUrl, masterConf),
- s"$baseContext/kill/*" -> new KillRequestServlet(masterActor, masterConf),
- s"$baseContext/status/*" -> new StatusRequestServlet(masterActor, masterConf),
- "/*" -> new ErrorServlet // default handler
- )
-
- /** Start the server and return the bound port. */
- def start(): Int = {
- val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
- _server = Some(server)
- logInfo(s"Started REST server for submitting applications on port $boundPort")
- boundPort
- }
-
- /**
- * Map the servlets to their corresponding contexts and attach them to a server.
- * Return a 2-tuple of the started server and the bound port.
- */
- private def doStart(startPort: Int): (Server, Int) = {
- val server = new Server(new InetSocketAddress(host, startPort))
- val threadPool = new QueuedThreadPool
- threadPool.setDaemon(true)
- server.setThreadPool(threadPool)
- val mainHandler = new ServletContextHandler
- mainHandler.setContextPath("/")
- contextToServlet.foreach { case (prefix, servlet) =>
- mainHandler.addServlet(new ServletHolder(servlet), prefix)
- }
- server.setHandler(mainHandler)
- server.start()
- val boundPort = server.getConnectors()(0).getLocalPort
- (server, boundPort)
- }
-
- def stop(): Unit = {
- _server.foreach(_.stop())
- }
-}
-
-private[rest] object StandaloneRestServer {
- val PROTOCOL_VERSION = StandaloneRestClient.PROTOCOL_VERSION
- val SC_UNKNOWN_PROTOCOL_VERSION = 468
-}
-
-/**
- * An abstract servlet for handling requests passed to the [[StandaloneRestServer]].
- */
-private[rest] abstract class StandaloneRestServlet extends HttpServlet with Logging {
-
- /**
- * Serialize the given response message to JSON and send it through the response servlet.
- * This validates the response before sending it to ensure it is properly constructed.
- */
- protected def sendResponse(
- responseMessage: SubmitRestProtocolResponse,
- responseServlet: HttpServletResponse): Unit = {
- val message = validateResponse(responseMessage, responseServlet)
- responseServlet.setContentType("application/json")
- responseServlet.setCharacterEncoding("utf-8")
- responseServlet.getWriter.write(message.toJson)
- }
-
- /**
- * Return any fields in the client request message that the server does not know about.
- *
- * The mechanism for this is to reconstruct the JSON on the server side and compare the
- * diff between this JSON and the one generated on the client side. Any fields that are
- * only in the client JSON are treated as unexpected.
- */
- protected def findUnknownFields(
- requestJson: String,
- requestMessage: SubmitRestProtocolMessage): Array[String] = {
- val clientSideJson = parse(requestJson)
- val serverSideJson = parse(requestMessage.toJson)
- val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
- unknown match {
- case j: JObject => j.obj.map { case (k, _) => k }.toArray
- case _ => Array.empty[String] // No difference
- }
- }
-
- /** Return a human readable String representation of the exception. */
- protected def formatException(e: Throwable): String = {
- val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
- s"$e\n$stackTraceString"
- }
-
- /** Construct an error message to signal the fact that an exception has been thrown. */
- protected def handleError(message: String): ErrorResponse = {
- val e = new ErrorResponse
- e.serverSparkVersion = sparkVersion
- e.message = message
- e
- }
-
- /**
- * Parse a submission ID from the relative path, assuming it is the first part of the path.
- * For instance, we expect the path to take the form /[submission ID]/maybe/something/else.
- * The returned submission ID cannot be empty. If the path is unexpected, return None.
- */
- protected def parseSubmissionId(path: String): Option[String] = {
- if (path == null || path.isEmpty) {
- None
- } else {
- path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
- }
- }
-
- /**
- * Validate the response to ensure that it is correctly constructed.
- *
- * If it is, simply return the message as is. Otherwise, return an error response instead
- * to propagate the exception back to the client and set the appropriate error code.
- */
- private def validateResponse(
- responseMessage: SubmitRestProtocolResponse,
- responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
- try {
- responseMessage.validate()
- responseMessage
- } catch {
- case e: Exception =>
- responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
- handleError("Internal server error: " + formatException(e))
- }
- }
+ masterUrl: String)
+ extends RestSubmissionServer(host, requestedPort, masterConf) {
+
+ protected override val submitRequestServlet =
+ new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
+ protected override val killRequestServlet =
+ new StandaloneKillRequestServlet(masterActor, masterConf)
+ protected override val statusRequestServlet =
+ new StandaloneStatusRequestServlet(masterActor, masterConf)
}
/**
* A servlet for handling kill requests passed to the [[StandaloneRestServer]].
*/
-private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
- extends StandaloneRestServlet {
-
- /**
- * If a submission ID is specified in the URL, have the Master kill the corresponding
- * driver and return an appropriate response to the client. Otherwise, return error.
- */
- protected override def doPost(
- request: HttpServletRequest,
- response: HttpServletResponse): Unit = {
- val submissionId = parseSubmissionId(request.getPathInfo)
- val responseMessage = submissionId.map(handleKill).getOrElse {
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
- handleError("Submission ID is missing in kill request.")
- }
- sendResponse(responseMessage, response)
- }
+private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: SparkConf)
+ extends KillRequestServlet {
protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
@@ -238,23 +86,8 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
/**
* A servlet for handling status requests passed to the [[StandaloneRestServer]].
*/
-private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
- extends StandaloneRestServlet {
-
- /**
- * If a submission ID is specified in the URL, request the status of the corresponding
- * driver from the Master and include it in the response. Otherwise, return error.
- */
- protected override def doGet(
- request: HttpServletRequest,
- response: HttpServletResponse): Unit = {
- val submissionId = parseSubmissionId(request.getPathInfo)
- val responseMessage = submissionId.map(handleStatus).getOrElse {
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
- handleError("Submission ID is missing in status request.")
- }
- sendResponse(responseMessage, response)
- }
+private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
+ extends StatusRequestServlet {
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
@@ -276,71 +109,11 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
/**
* A servlet for handling submit requests passed to the [[StandaloneRestServer]].
*/
-private[rest] class SubmitRequestServlet(
+private[rest] class StandaloneSubmitRequestServlet(
masterActor: ActorRef,
masterUrl: String,
conf: SparkConf)
- extends StandaloneRestServlet {
-
- /**
- * Submit an application to the Master with parameters specified in the request.
- *
- * The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
- * If the request is successfully processed, return an appropriate response to the
- * client indicating so. Otherwise, return error instead.
- */
- protected override def doPost(
- requestServlet: HttpServletRequest,
- responseServlet: HttpServletResponse): Unit = {
- val responseMessage =
- try {
- val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
- val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
- // The response should have already been validated on the client.
- // In case this is not true, validate it ourselves to avoid potential NPEs.
- requestMessage.validate()
- handleSubmit(requestMessageJson, requestMessage, responseServlet)
- } catch {
- // The client failed to provide a valid JSON, so this is not our fault
- case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
- responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
- handleError("Malformed request: " + formatException(e))
- }
- sendResponse(responseMessage, responseServlet)
- }
-
- /**
- * Handle the submit request and construct an appropriate response to return to the client.
- *
- * This assumes that the request message is already successfully validated.
- * If the request message is not of the expected type, return error to the client.
- */
- private def handleSubmit(
- requestMessageJson: String,
- requestMessage: SubmitRestProtocolMessage,
- responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
- requestMessage match {
- case submitRequest: CreateSubmissionRequest =>
- val askTimeout = RpcUtils.askTimeout(conf)
- val driverDescription = buildDriverDescription(submitRequest)
- val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
- DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
- val submitResponse = new CreateSubmissionResponse
- submitResponse.serverSparkVersion = sparkVersion
- submitResponse.message = response.message
- submitResponse.success = response.success
- submitResponse.submissionId = response.driverId.orNull
- val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
- if (unknownFields.nonEmpty) {
- // If there are fields that the server does not know about, warn the client
- submitResponse.unknownFields = unknownFields
- }
- submitResponse
- case unexpected =>
- responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
- handleError(s"Received message of unexpected type ${unexpected.messageType}.")
- }
- }
+ extends SubmitRequestServlet {
/**
* Build a driver description from the fields specified in the submit request.
@@ -389,50 +162,37 @@ private[rest] class SubmitRequestServlet(
new DriverDescription(
appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
}
-}
-/**
- * A default servlet that handles error cases that are not captured by other servlets.
- */
-private class ErrorServlet extends StandaloneRestServlet {
- private val serverVersion = StandaloneRestServer.PROTOCOL_VERSION
-
- /** Service a faulty request by returning an appropriate error message to the client. */
- protected override def service(
- request: HttpServletRequest,
- response: HttpServletResponse): Unit = {
- val path = request.getPathInfo
- val parts = path.stripPrefix("/").split("/").filter(_.nonEmpty).toList
- var versionMismatch = false
- var msg =
- parts match {
- case Nil =>
- // http://host:port/
- "Missing protocol version."
- case `serverVersion` :: Nil =>
- // http://host:port/correct-version
- "Missing the /submissions prefix."
- case `serverVersion` :: "submissions" :: tail =>
- // http://host:port/correct-version/submissions/*
- "Missing an action: please specify one of /create, /kill, or /status."
- case unknownVersion :: tail =>
- // http://host:port/unknown-version/*
- versionMismatch = true
- s"Unknown protocol version '$unknownVersion'."
- case _ =>
- // never reached
- s"Malformed path $path."
- }
- msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
- val error = handleError(msg)
- // If there is a version mismatch, include the highest protocol version that
- // this server supports in case the client wants to retry with our version
- if (versionMismatch) {
- error.highestProtocolVersion = serverVersion
- response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
- } else {
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ /**
+ * Handle the submit request and construct an appropriate response to return to the client.
+ *
+ * This assumes that the request message is already successfully validated.
+ * If the request message is not of the expected type, return error to the client.
+ */
+ protected override def handleSubmit(
+ requestMessageJson: String,
+ requestMessage: SubmitRestProtocolMessage,
+ responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+ requestMessage match {
+ case submitRequest: CreateSubmissionRequest =>
+ val askTimeout = RpcUtils.askTimeout(conf)
+ val driverDescription = buildDriverDescription(submitRequest)
+ val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
+ DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
+ val submitResponse = new CreateSubmissionResponse
+ submitResponse.serverSparkVersion = sparkVersion
+ submitResponse.message = response.message
+ submitResponse.success = response.success
+ submitResponse.submissionId = response.driverId.orNull
+ val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+ if (unknownFields.nonEmpty) {
+ // If there are fields that the server does not know about, warn the client
+ submitResponse.unknownFields = unknownFields
+ }
+ submitResponse
+ case unexpected =>
+ responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ handleError(s"Received message of unexpected type ${unexpected.messageType}.")
}
- sendResponse(error, response)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
index d80abdf15f..0d50a76894 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
@@ -61,7 +61,7 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
assertProperty[Boolean](key, "boolean", _.toBoolean)
private def assertPropertyIsNumeric(key: String): Unit =
- assertProperty[Int](key, "numeric", _.toInt)
+ assertProperty[Double](key, "numeric", _.toDouble)
private def assertPropertyIsMemory(key: String): Unit =
assertProperty[Int](key, "memory", Utils.memoryStringToMb)
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
index 8fde8c142a..0e226ee294 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -35,7 +35,7 @@ private[rest] abstract class SubmitRestProtocolResponse extends SubmitRestProtoc
/**
* A response to a [[CreateSubmissionRequest]] in the REST application submission protocol.
*/
-private[rest] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
+private[spark] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
@@ -46,7 +46,7 @@ private[rest] class CreateSubmissionResponse extends SubmitRestProtocolResponse
/**
* A response to a kill request in the REST application submission protocol.
*/
-private[rest] class KillSubmissionResponse extends SubmitRestProtocolResponse {
+private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
@@ -58,7 +58,7 @@ private[rest] class KillSubmissionResponse extends SubmitRestProtocolResponse {
/**
* A response to a status request in the REST application submission protocol.
*/
-private[rest] class SubmissionStatusResponse extends SubmitRestProtocolResponse {
+private[spark] class SubmissionStatusResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
var driverState: String = null
var workerId: String = null
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
new file mode 100644
index 0000000000..fd17a980c9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest.mesos
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.util.concurrent.atomic.AtomicLong
+import javax.servlet.http.HttpServletResponse
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest._
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.util.Utils
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+
+
+/**
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
+ * All requests are forwarded to
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * This is intended to be used in Mesos cluster mode only.
+ * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs.
+ */
+private[spark] class MesosRestServer(
+ host: String,
+ requestedPort: Int,
+ masterConf: SparkConf,
+ scheduler: MesosClusterScheduler)
+ extends RestSubmissionServer(host, requestedPort, masterConf) {
+
+ protected override val submitRequestServlet =
+ new MesosSubmitRequestServlet(scheduler, masterConf)
+ protected override val killRequestServlet =
+ new MesosKillRequestServlet(scheduler, masterConf)
+ protected override val statusRequestServlet =
+ new MesosStatusRequestServlet(scheduler, masterConf)
+}
+
+private[deploy] class MesosSubmitRequestServlet(
+ scheduler: MesosClusterScheduler,
+ conf: SparkConf)
+ extends SubmitRequestServlet {
+
+ private val DEFAULT_SUPERVISE = false
+ private val DEFAULT_MEMORY = 512 // mb
+ private val DEFAULT_CORES = 1.0
+
+ private val nextDriverNumber = new AtomicLong(0)
+ private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
+ private def newDriverId(submitDate: Date): String = {
+ "driver-%s-%04d".format(
+ createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet())
+ }
+
+ /**
+ * Build a driver description from the fields specified in the submit request.
+ *
+ * This involves constructing a command that launches a mesos framework for the job.
+ * This does not currently consider fields used by python applications since python
+ * is not supported in mesos cluster mode yet.
+ */
+ private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
+ // Required fields, including the main class because python is not yet supported
+ val appResource = Option(request.appResource).getOrElse {
+ throw new SubmitRestMissingFieldException("Application jar is missing.")
+ }
+ val mainClass = Option(request.mainClass).getOrElse {
+ throw new SubmitRestMissingFieldException("Main class is missing.")
+ }
+
+ // Optional fields
+ val sparkProperties = request.sparkProperties
+ val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
+ val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
+ val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
+ val superviseDriver = sparkProperties.get("spark.driver.supervise")
+ val driverMemory = sparkProperties.get("spark.driver.memory")
+ val driverCores = sparkProperties.get("spark.driver.cores")
+ val appArgs = request.appArgs
+ val environmentVariables = request.environmentVariables
+ val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
+
+ // Construct driver description
+ val conf = new SparkConf(false).setAll(sparkProperties)
+ val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
+ val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
+ val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+ val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+ val javaOpts = sparkJavaOpts ++ extraJavaOpts
+ val command = new Command(
+ mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
+ val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
+ val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
+ val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
+ val submitDate = new Date()
+ val submissionId = newDriverId(submitDate)
+
+ new MesosDriverDescription(
+ name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
+ command, request.sparkProperties, submissionId, submitDate)
+ }
+
+ protected override def handleSubmit(
+ requestMessageJson: String,
+ requestMessage: SubmitRestProtocolMessage,
+ responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+ requestMessage match {
+ case submitRequest: CreateSubmissionRequest =>
+ val driverDescription = buildDriverDescription(submitRequest)
+ val s = scheduler.submitDriver(driverDescription)
+ s.serverSparkVersion = sparkVersion
+ val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+ if (unknownFields.nonEmpty) {
+ // If there are fields that the server does not know about, warn the client
+ s.unknownFields = unknownFields
+ }
+ s
+ case unexpected =>
+ responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ handleError(s"Received message of unexpected type ${unexpected.messageType}.")
+ }
+ }
+}
+
+private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+ extends KillRequestServlet {
+ protected override def handleKill(submissionId: String): KillSubmissionResponse = {
+ val k = scheduler.killDriver(submissionId)
+ k.serverSparkVersion = sparkVersion
+ k
+ }
+}
+
+private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+ extends StatusRequestServlet {
+ protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
+ val d = scheduler.getDriverStatus(submissionId)
+ d.serverSparkVersion = sparkVersion
+ d
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 82f652dae0..3412301e64 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,20 +18,17 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{List => JList}
-import java.util.Collections
+import java.util.{Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -49,17 +46,10 @@ private[spark] class CoarseMesosSchedulerBackend(
master: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with MScheduler
- with Logging {
+ with MesosSchedulerUtils {
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
@@ -87,26 +77,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def start() {
super.start()
-
- synchronized {
- new Thread("CoarseMesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try { {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- }
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
+ val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
+ startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
@@ -150,8 +122,10 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
- val uri = conf.get("spark.executor.uri", null)
- if (uri == null) {
+ val uri = conf.getOption("spark.executor.uri")
+ .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
+ if (uri.isEmpty) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
@@ -164,7 +138,7 @@ private[spark] class CoarseMesosSchedulerBackend(
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
+ val basename = uri.get.split('/').last.split('.').head
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
@@ -173,7 +147,7 @@ private[spark] class CoarseMesosSchedulerBackend(
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
command.build()
}
@@ -183,18 +157,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
+ markRegistered()
}
override def disconnected(d: SchedulerDriver) {}
@@ -245,14 +208,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- private def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- 0
- }
-
/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
@@ -284,7 +239,8 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+ // In case we'd rejected everything before but have now lost a node
+ mesosDriver.reviveOffers()
}
}
}
@@ -296,8 +252,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def stop() {
super.stop()
- if (driver != null) {
- driver.stop()
+ if (mesosDriver != null) {
+ mesosDriver.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
new file mode 100644
index 0000000000..3efc536f14
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConversions._
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkCuratorUtil
+import org.apache.spark.util.Utils
+
+/**
+ * Persistence engine factory that is responsible for creating new persistence engines
+ * to store Mesos cluster mode state.
+ */
+private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) {
+ def createEngine(path: String): MesosClusterPersistenceEngine
+}
+
+/**
+ * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode
+ * specific state, so that on failover all the state can be recovered and the scheduler
+ * can resume managing the drivers.
+ */
+private[spark] trait MesosClusterPersistenceEngine {
+ def persist(name: String, obj: Object): Unit
+ def expunge(name: String): Unit
+ def fetch[T](name: String): Option[T]
+ def fetchAll[T](): Iterable[T]
+}
+
+/**
+ * Zookeeper backed persistence engine factory.
+ * All Zk engines created from this factory shares the same Zookeeper client, so
+ * all of them reuses the same connection pool.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf)
+ extends MesosClusterPersistenceEngineFactory(conf) {
+
+ lazy val zk = SparkCuratorUtil.newClient(conf, "spark.mesos.deploy.zookeeper.url")
+
+ def createEngine(path: String): MesosClusterPersistenceEngine = {
+ new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
+ }
+}
+
+/**
+ * Black hole persistence engine factory that creates black hole
+ * persistence engines, which stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
+ extends MesosClusterPersistenceEngineFactory(null) {
+ def createEngine(path: String): MesosClusterPersistenceEngine = {
+ new BlackHoleMesosClusterPersistenceEngine
+ }
+}
+
+/**
+ * Black hole persistence engine that stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine {
+ override def persist(name: String, obj: Object): Unit = {}
+ override def fetch[T](name: String): Option[T] = None
+ override def expunge(name: String): Unit = {}
+ override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
+}
+
+/**
+ * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state
+ * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but
+ * reuses a shared Zookeeper client.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngine(
+ baseDir: String,
+ zk: CuratorFramework,
+ conf: SparkConf)
+ extends MesosClusterPersistenceEngine with Logging {
+ private val WORKING_DIR =
+ conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
+
+ SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+
+ def path(name: String): String = {
+ WORKING_DIR + "/" + name
+ }
+
+ override def expunge(name: String): Unit = {
+ zk.delete().forPath(path(name))
+ }
+
+ override def persist(name: String, obj: Object): Unit = {
+ val serialized = Utils.serialize(obj)
+ val zkPath = path(name)
+ zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
+ }
+
+ override def fetch[T](name: String): Option[T] = {
+ val zkPath = path(name)
+
+ try {
+ val fileData = zk.getData().forPath(zkPath)
+ Some(Utils.deserialize[T](fileData))
+ } catch {
+ case e: NoNodeException => None
+ case e: Exception => {
+ logWarning("Exception while reading persisted file, deleting", e)
+ zk.delete().forPath(zkPath)
+ None
+ }
+ }
+ }
+
+ override def fetchAll[T](): Iterable[T] = {
+ zk.getChildren.forPath(WORKING_DIR).map(fetch[T]).flatten
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
new file mode 100644
index 0000000000..0396e62be5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, Date, List => JList}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mesos.Protos.Environment.Variable
+import org.apache.mesos.Protos.TaskStatus.Reason
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.{Scheduler, SchedulerDriver}
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.Utils
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
+
+
+/**
+ * Tracks the current state of a Mesos Task that runs a Spark driver.
+ * @param driverDescription Submitted driver description from
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
+ * @param taskId Mesos TaskID generated for the task
+ * @param slaveId Slave ID that the task is assigned to
+ * @param mesosTaskStatus The last known task status update.
+ * @param startDate The date the task was launched
+ */
+private[spark] class MesosClusterSubmissionState(
+ val driverDescription: MesosDriverDescription,
+ val taskId: TaskID,
+ val slaveId: SlaveID,
+ var mesosTaskStatus: Option[TaskStatus],
+ var startDate: Date)
+ extends Serializable {
+
+ def copy(): MesosClusterSubmissionState = {
+ new MesosClusterSubmissionState(
+ driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
+ }
+}
+
+/**
+ * Tracks the retry state of a driver, which includes the next time it should be scheduled
+ * and necessary information to do exponential backoff.
+ * This class is not thread-safe, and we expect the caller to handle synchronizing state.
+ * @param lastFailureStatus Last Task status when it failed.
+ * @param retries Number of times it has been retried.
+ * @param nextRetry Time at which it should be retried next
+ * @param waitTime The amount of time driver is scheduled to wait until next retry.
+ */
+private[spark] class MesosClusterRetryState(
+ val lastFailureStatus: TaskStatus,
+ val retries: Int,
+ val nextRetry: Date,
+ val waitTime: Int) extends Serializable {
+ def copy(): MesosClusterRetryState =
+ new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime)
+}
+
+/**
+ * The full state of the cluster scheduler, currently being used for displaying
+ * information on the UI.
+ * @param frameworkId Mesos Framework id for the cluster scheduler.
+ * @param masterUrl The Mesos master url
+ * @param queuedDrivers All drivers queued to be launched
+ * @param launchedDrivers All launched or running drivers
+ * @param finishedDrivers All terminated drivers
+ * @param pendingRetryDrivers All drivers pending to be retried
+ */
+private[spark] class MesosClusterSchedulerState(
+ val frameworkId: String,
+ val masterUrl: Option[String],
+ val queuedDrivers: Iterable[MesosDriverDescription],
+ val launchedDrivers: Iterable[MesosClusterSubmissionState],
+ val finishedDrivers: Iterable[MesosClusterSubmissionState],
+ val pendingRetryDrivers: Iterable[MesosDriverDescription])
+
+/**
+ * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
+ * as Mesos tasks in a Mesos cluster.
+ * All drivers are launched asynchronously by the framework, which will eventually be launched
+ * by one of the slaves in the cluster. The results of the driver will be stored in slave's task
+ * sandbox which is accessible by visiting the Mesos UI.
+ * This scheduler supports recovery by persisting all its state and performs task reconciliation
+ * on recover, which gets all the latest state for all the drivers from Mesos master.
+ */
+private[spark] class MesosClusterScheduler(
+ engineFactory: MesosClusterPersistenceEngineFactory,
+ conf: SparkConf)
+ extends Scheduler with MesosSchedulerUtils {
+ var frameworkUrl: String = _
+ private val metricsSystem =
+ MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
+ private val master = conf.get("spark.master")
+ private val appName = conf.get("spark.app.name")
+ private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
+ private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
+ private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
+ private val schedulerState = engineFactory.createEngine("scheduler")
+ private val stateLock = new ReentrantLock()
+ private val finishedDrivers =
+ new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
+ private var frameworkId: String = null
+ // Holds all the launched drivers and current launch state, keyed by driver id.
+ private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
+ // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
+ // All drivers that are loaded after failover are added here, as we need get the latest
+ // state of the tasks from Mesos.
+ private val pendingRecover = new mutable.HashMap[String, SlaveID]()
+ // Stores all the submitted drivers that hasn't been launched.
+ private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
+ // All supervised drivers that are waiting to retry after termination.
+ private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
+ private val queuedDriversState = engineFactory.createEngine("driverQueue")
+ private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
+ private val pendingRetryDriversState = engineFactory.createEngine("retryList")
+ // Flag to mark if the scheduler is ready to be called, which is until the scheduler
+ // is registered with Mesos master.
+ @volatile protected var ready = false
+ private var masterInfo: Option[MasterInfo] = None
+
+ def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
+ val c = new CreateSubmissionResponse
+ if (!ready) {
+ c.success = false
+ c.message = "Scheduler is not ready to take requests"
+ return c
+ }
+
+ stateLock.synchronized {
+ if (isQueueFull()) {
+ c.success = false
+ c.message = "Already reached maximum submission size"
+ return c
+ }
+ c.submissionId = desc.submissionId
+ queuedDriversState.persist(desc.submissionId, desc)
+ queuedDrivers += desc
+ c.success = true
+ }
+ c
+ }
+
+ def killDriver(submissionId: String): KillSubmissionResponse = {
+ val k = new KillSubmissionResponse
+ if (!ready) {
+ k.success = false
+ k.message = "Scheduler is not ready to take requests"
+ return k
+ }
+ k.submissionId = submissionId
+ stateLock.synchronized {
+ // We look for the requested driver in the following places:
+ // 1. Check if submission is running or launched.
+ // 2. Check if it's still queued.
+ // 3. Check if it's in the retry list.
+ // 4. Check if it has already completed.
+ if (launchedDrivers.contains(submissionId)) {
+ val task = launchedDrivers(submissionId)
+ mesosDriver.killTask(task.taskId)
+ k.success = true
+ k.message = "Killing running driver"
+ } else if (removeFromQueuedDrivers(submissionId)) {
+ k.success = true
+ k.message = "Removed driver while it's still pending"
+ } else if (removeFromPendingRetryDrivers(submissionId)) {
+ k.success = true
+ k.message = "Removed driver while it's being retried"
+ } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
+ k.success = false
+ k.message = "Driver already terminated"
+ } else {
+ k.success = false
+ k.message = "Cannot find driver"
+ }
+ }
+ k
+ }
+
+ def getDriverStatus(submissionId: String): SubmissionStatusResponse = {
+ val s = new SubmissionStatusResponse
+ if (!ready) {
+ s.success = false
+ s.message = "Scheduler is not ready to take requests"
+ return s
+ }
+ s.submissionId = submissionId
+ stateLock.synchronized {
+ if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
+ s.success = true
+ s.driverState = "QUEUED"
+ } else if (launchedDrivers.contains(submissionId)) {
+ s.success = true
+ s.driverState = "RUNNING"
+ launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
+ } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
+ s.success = true
+ s.driverState = "FINISHED"
+ finishedDrivers
+ .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
+ .foreach(state => s.message = state.toString)
+ } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
+ val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
+ .get.retryState.get.lastFailureStatus
+ s.success = true
+ s.driverState = "RETRYING"
+ s.message = status.toString
+ } else {
+ s.success = false
+ s.driverState = "NOT_FOUND"
+ }
+ }
+ s
+ }
+
+ private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
+
+ /**
+ * Recover scheduler state that is persisted.
+ * We still need to do task reconciliation to be up to date of the latest task states
+ * as it might have changed while the scheduler is failing over.
+ */
+ private def recoverState(): Unit = {
+ stateLock.synchronized {
+ launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
+ launchedDrivers(state.taskId.getValue) = state
+ pendingRecover(state.taskId.getValue) = state.slaveId
+ }
+ queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
+ // There is potential timing issue where a queued driver might have been launched
+ // but the scheduler shuts down before the queued driver was able to be removed
+ // from the queue. We try to mitigate this issue by walking through all queued drivers
+ // and remove if they're already launched.
+ queuedDrivers
+ .filter(d => launchedDrivers.contains(d.submissionId))
+ .foreach(d => removeFromQueuedDrivers(d.submissionId))
+ pendingRetryDriversState.fetchAll[MesosDriverDescription]()
+ .foreach(s => pendingRetryDrivers += s)
+ // TODO: Consider storing finished drivers so we can show them on the UI after
+ // failover. For now we clear the history on each recovery.
+ finishedDrivers.clear()
+ }
+ }
+
+ /**
+ * Starts the cluster scheduler and wait until the scheduler is registered.
+ * This also marks the scheduler to be ready for requests.
+ */
+ def start(): Unit = {
+ // TODO: Implement leader election to make sure only one framework running in the cluster.
+ val fwId = schedulerState.fetch[String]("frameworkId")
+ val builder = FrameworkInfo.newBuilder()
+ .setUser(Utils.getCurrentUserName())
+ .setName(appName)
+ .setWebuiUrl(frameworkUrl)
+ .setCheckpoint(true)
+ .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
+ fwId.foreach { id =>
+ builder.setId(FrameworkID.newBuilder().setValue(id).build())
+ frameworkId = id
+ }
+ recoverState()
+ metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
+ metricsSystem.start()
+ startScheduler(master, MesosClusterScheduler.this, builder.build())
+ ready = true
+ }
+
+ def stop(): Unit = {
+ ready = false
+ metricsSystem.report()
+ metricsSystem.stop()
+ mesosDriver.stop(true)
+ }
+
+ override def registered(
+ driver: SchedulerDriver,
+ newFrameworkId: FrameworkID,
+ masterInfo: MasterInfo): Unit = {
+ logInfo("Registered as framework ID " + newFrameworkId.getValue)
+ if (newFrameworkId.getValue != frameworkId) {
+ frameworkId = newFrameworkId.getValue
+ schedulerState.persist("frameworkId", frameworkId)
+ }
+ markRegistered()
+
+ stateLock.synchronized {
+ this.masterInfo = Some(masterInfo)
+ if (!pendingRecover.isEmpty) {
+ // Start task reconciliation if we need to recover.
+ val statuses = pendingRecover.collect {
+ case (taskId, slaveId) =>
+ val newStatus = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+ .setSlaveId(slaveId)
+ .setState(MesosTaskState.TASK_STAGING)
+ .build()
+ launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
+ .getOrElse(newStatus)
+ }
+ // TODO: Page the status updates to avoid trying to reconcile
+ // a large amount of tasks at once.
+ driver.reconcileTasks(statuses)
+ }
+ }
+ }
+
+ private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
+ val appJar = CommandInfo.URI.newBuilder()
+ .setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
+ val builder = CommandInfo.newBuilder().addUris(appJar)
+ val entries =
+ (conf.getOption("spark.executor.extraLibraryPath").toList ++
+ desc.command.libraryPathEntries)
+ val prefixEnv = if (!entries.isEmpty) {
+ Utils.libraryPathEnvPrefix(entries)
+ } else {
+ ""
+ }
+ val envBuilder = Environment.newBuilder()
+ desc.command.environment.foreach { case (k, v) =>
+ envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
+ }
+ // Pass all spark properties to executor.
+ val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+ envBuilder.addVariables(
+ Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
+ val cmdOptions = generateCmdOption(desc)
+ val executorUri = desc.schedulerProperties.get("spark.executor.uri")
+ .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+ val appArguments = desc.command.arguments.mkString(" ")
+ val cmd = if (executorUri.isDefined) {
+ builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
+ val folderBasename = executorUri.get.split('/').last.split('.').head
+ val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
+ val cmdJar = s"../${desc.jarUrl.split("/").last}"
+ s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+ } else {
+ val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
+ .orElse(conf.getOption("spark.home"))
+ .orElse(Option(System.getenv("SPARK_HOME")))
+ .getOrElse {
+ throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+ }
+ val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
+ val cmdJar = desc.jarUrl.split("/").last
+ s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+ }
+ builder.setValue(cmd)
+ builder.setEnvironment(envBuilder.build())
+ builder.build()
+ }
+
+ private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
+ var options = Seq(
+ "--name", desc.schedulerProperties("spark.app.name"),
+ "--class", desc.command.mainClass,
+ "--master", s"mesos://${conf.get("spark.master")}",
+ "--driver-cores", desc.cores.toString,
+ "--driver-memory", s"${desc.mem}M")
+ desc.schedulerProperties.get("spark.executor.memory").map { v =>
+ options ++= Seq("--executor-memory", v)
+ }
+ desc.schedulerProperties.get("spark.cores.max").map { v =>
+ options ++= Seq("--total-executor-cores", v)
+ }
+ options
+ }
+
+ private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
+ override def toString(): String = {
+ s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
+ }
+ }
+
+ /**
+ * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
+ * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
+ * logic on each task.
+ */
+ private def scheduleTasks(
+ candidates: Seq[MesosDriverDescription],
+ afterLaunchCallback: (String) => Boolean,
+ currentOffers: List[ResourceOffer],
+ tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
+ for (submission <- candidates) {
+ val driverCpu = submission.cores
+ val driverMem = submission.mem
+ logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
+ val offerOption = currentOffers.find { o =>
+ o.cpu >= driverCpu && o.mem >= driverMem
+ }
+ if (offerOption.isEmpty) {
+ logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
+ s"cpu: $driverCpu, mem: $driverMem")
+ } else {
+ val offer = offerOption.get
+ offer.cpu -= driverCpu
+ offer.mem -= driverMem
+ val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
+ val cpuResource = Resource.newBuilder()
+ .setName("cpus").setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
+ val memResource = Resource.newBuilder()
+ .setName("mem").setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
+ val commandInfo = buildDriverCommand(submission)
+ val appName = submission.schedulerProperties("spark.app.name")
+ val taskInfo = TaskInfo.newBuilder()
+ .setTaskId(taskId)
+ .setName(s"Driver for $appName")
+ .setSlaveId(offer.offer.getSlaveId)
+ .setCommand(commandInfo)
+ .addResources(cpuResource)
+ .addResources(memResource)
+ .build()
+ val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
+ queuedTasks += taskInfo
+ logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
+ submission.submissionId)
+ val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
+ None, new Date())
+ launchedDrivers(submission.submissionId) = newState
+ launchedDriversState.persist(submission.submissionId, newState)
+ afterLaunchCallback(submission.submissionId)
+ }
+ }
+ }
+
+ override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
+ val currentOffers = offers.map { o =>
+ new ResourceOffer(
+ o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
+ }.toList
+ logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
+ val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
+ val currentTime = new Date()
+
+ stateLock.synchronized {
+ // We first schedule all the supervised drivers that are ready to retry.
+ // This list will be empty if none of the drivers are marked as supervise.
+ val driversToRetry = pendingRetryDrivers.filter { d =>
+ d.retryState.get.nextRetry.before(currentTime)
+ }
+ scheduleTasks(
+ driversToRetry,
+ removeFromPendingRetryDrivers,
+ currentOffers,
+ tasks)
+ // Then we walk through the queued drivers and try to schedule them.
+ scheduleTasks(
+ queuedDrivers,
+ removeFromQueuedDrivers,
+ currentOffers,
+ tasks)
+ }
+ tasks.foreach { case (offerId, tasks) =>
+ driver.launchTasks(Collections.singleton(offerId), tasks)
+ }
+ offers
+ .filter(o => !tasks.keySet.contains(o.getId))
+ .foreach(o => driver.declineOffer(o.getId))
+ }
+
+ def getSchedulerState(): MesosClusterSchedulerState = {
+ def copyBuffer(
+ buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
+ val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+ buffer.copyToBuffer(newBuffer)
+ newBuffer
+ }
+ stateLock.synchronized {
+ new MesosClusterSchedulerState(
+ frameworkId,
+ masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"),
+ copyBuffer(queuedDrivers),
+ launchedDrivers.values.map(_.copy()).toList,
+ finishedDrivers.map(_.copy()).toList,
+ copyBuffer(pendingRetryDrivers))
+ }
+ }
+
+ override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {}
+ override def disconnected(driver: SchedulerDriver): Unit = {}
+ override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
+ logInfo(s"Framework re-registered with master ${masterInfo.getId}")
+ }
+ override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
+ override def error(driver: SchedulerDriver, error: String): Unit = {
+ logError("Error received: " + error)
+ }
+
+ /**
+ * Check if the task state is a recoverable state that we can relaunch the task.
+ * Task state like TASK_ERROR are not relaunchable state since it wasn't able
+ * to be validated by Mesos.
+ */
+ private def shouldRelaunch(state: MesosTaskState): Boolean = {
+ state == MesosTaskState.TASK_FAILED ||
+ state == MesosTaskState.TASK_KILLED ||
+ state == MesosTaskState.TASK_LOST
+ }
+
+ override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
+ val taskId = status.getTaskId.getValue
+ stateLock.synchronized {
+ if (launchedDrivers.contains(taskId)) {
+ if (status.getReason == Reason.REASON_RECONCILIATION &&
+ !pendingRecover.contains(taskId)) {
+ // Task has already received update and no longer requires reconciliation.
+ return
+ }
+ val state = launchedDrivers(taskId)
+ // Check if the driver is supervise enabled and can be relaunched.
+ if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
+ removeFromLaunchedDrivers(taskId)
+ val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
+ val (retries, waitTimeSec) = retryState
+ .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
+ .getOrElse{ (1, 1) }
+ val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
+
+ val newDriverDescription = state.driverDescription.copy(
+ retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
+ pendingRetryDrivers += newDriverDescription
+ pendingRetryDriversState.persist(taskId, newDriverDescription)
+ } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
+ removeFromLaunchedDrivers(taskId)
+ if (finishedDrivers.size >= retainedDrivers) {
+ val toRemove = math.max(retainedDrivers / 10, 1)
+ finishedDrivers.trimStart(toRemove)
+ }
+ finishedDrivers += state
+ }
+ state.mesosTaskStatus = Option(status)
+ } else {
+ logError(s"Unable to find driver $taskId in status update")
+ }
+ }
+ }
+
+ override def frameworkMessage(
+ driver: SchedulerDriver,
+ executorId: ExecutorID,
+ slaveId: SlaveID,
+ message: Array[Byte]): Unit = {}
+
+ override def executorLost(
+ driver: SchedulerDriver,
+ executorId: ExecutorID,
+ slaveId: SlaveID,
+ status: Int): Unit = {}
+
+ private def removeFromQueuedDrivers(id: String): Boolean = {
+ val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
+ if (index != -1) {
+ queuedDrivers.remove(index)
+ queuedDriversState.expunge(id)
+ true
+ } else {
+ false
+ }
+ }
+
+ private def removeFromLaunchedDrivers(id: String): Boolean = {
+ if (launchedDrivers.remove(id).isDefined) {
+ launchedDriversState.expunge(id)
+ true
+ } else {
+ false
+ }
+ }
+
+ private def removeFromPendingRetryDrivers(id: String): Boolean = {
+ val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
+ if (index != -1) {
+ pendingRetryDrivers.remove(index)
+ pendingRetryDriversState.expunge(id)
+ true
+ } else {
+ false
+ }
+ }
+
+ def getQueuedDriversSize: Int = queuedDrivers.size
+ def getLaunchedDriversSize: Int = launchedDrivers.size
+ def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
new file mode 100644
index 0000000000..1fe94974c8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
+ extends Source {
+ override def sourceName: String = "mesos_cluster"
+ override def metricRegistry: MetricRegistry = new MetricRegistry()
+
+ metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
+ override def getValue: Int = scheduler.getQueuedDriversSize
+ })
+
+ metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
+ override def getValue: Int = scheduler.getLaunchedDriversSize
+ })
+
+ metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
+ override def getValue: Int = scheduler.getPendingRetryDriversSize
+ })
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d9d62b0e28..8346a24074 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -18,23 +18,19 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
+import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
- ExecutorInfo => MesosExecutorInfo, _}
-
+import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkException, TaskState}
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
@@ -47,14 +43,7 @@ private[spark] class MesosSchedulerBackend(
master: String)
extends SchedulerBackend
with MScheduler
- with Logging {
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
+ with MesosSchedulerUtils {
// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
@@ -73,26 +62,9 @@ private[spark] class MesosSchedulerBackend(
@volatile var appId: String = _
override def start() {
- synchronized {
- classLoader = Thread.currentThread.getContextClassLoader
-
- new Thread("MesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = MesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
+ val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
+ classLoader = Thread.currentThread.getContextClassLoader
+ startScheduler(master, MesosSchedulerBackend.this, fwInfo)
}
def createExecutorInfo(execId: String): MesosExecutorInfo = {
@@ -125,17 +97,19 @@ private[spark] class MesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val uri = sc.conf.get("spark.executor.uri", null)
+ val uri = sc.conf.getOption("spark.executor.uri")
+ .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
val executorBackendName = classOf[MesosExecutorBackend].getName
- if (uri == null) {
+ if (uri.isEmpty) {
val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
command.setValue(s"$prefixEnv $executorPath $executorBackendName")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
+ val basename = uri.get.split('/').last.split('.').head
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
val cpus = Resource.newBuilder()
.setName("cpus")
@@ -181,18 +155,7 @@ private[spark] class MesosSchedulerBackend(
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
+ markRegistered()
}
}
@@ -287,14 +250,6 @@ private[spark] class MesosSchedulerBackend(
}
}
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- 0
- }
-
/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
@@ -339,13 +294,13 @@ private[spark] class MesosSchedulerBackend(
}
override def stop() {
- if (driver != null) {
- driver.stop()
+ if (mesosDriver != null) {
+ mesosDriver.stop()
}
}
override def reviveOffers() {
- driver.reviveOffers()
+ mesosDriver.reviveOffers()
}
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
@@ -380,7 +335,7 @@ private[spark] class MesosSchedulerBackend(
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
- driver.killTask(
+ mesosDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
new file mode 100644
index 0000000000..d11228f3d0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.List
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.JavaConversions._
+
+import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
+ * methods and Mesos scheduler will use.
+ */
+private[mesos] trait MesosSchedulerUtils extends Logging {
+ // Lock used to wait for scheduler to be registered
+ private final val registerLatch = new CountDownLatch(1)
+
+ // Driver for talking to Mesos
+ protected var mesosDriver: MesosSchedulerDriver = null
+
+ /**
+ * Starts the MesosSchedulerDriver with the provided information. This method returns
+ * only after the scheduler has registered with Mesos.
+ * @param masterUrl Mesos master connection URL
+ * @param scheduler Scheduler object
+ * @param fwInfo FrameworkInfo to pass to the Mesos master
+ */
+ def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = {
+ synchronized {
+ if (mesosDriver != null) {
+ registerLatch.await()
+ return
+ }
+
+ new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
+ setDaemon(true)
+
+ override def run() {
+ mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
+ try {
+ val ret = mesosDriver.run()
+ logInfo("driver.run() returned with code " + ret)
+ if (ret.equals(Status.DRIVER_ABORTED)) {
+ System.exit(1)
+ }
+ } catch {
+ case e: Exception => {
+ logError("driver.run() failed", e)
+ System.exit(1)
+ }
+ }
+ }
+ }.start()
+
+ registerLatch.await()
+ }
+ }
+
+ /**
+ * Signal that the scheduler has registered with Mesos.
+ */
+ protected def markRegistered(): Unit = {
+ registerLatch.countDown()
+ }
+
+ /**
+ * Get the amount of resources for the specified type from the resource list
+ */
+ protected def getResource(res: List[Resource], name: String): Double = {
+ for (r <- res if r.getName == name) {
+ return r.getScalar.getValue
+ }
+ 0.0
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4561e5b8e9..c4e6f06146 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -231,7 +231,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val childArgsStr = childArgs.mkString(" ")
if (useRest) {
childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
- mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient")
+ mainClass should be ("org.apache.spark.deploy.rest.RestSubmissionClient")
} else {
childArgsStr should startWith ("--supervise --memory 4g --cores 5")
childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2"
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 8e09976636..0a318a27ac 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,9 +39,9 @@ import org.apache.spark.deploy.master.DriverState._
* Tests for the REST application submission protocol used in standalone cluster mode.
*/
class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
- private val client = new StandaloneRestClient
+ private val client = new RestSubmissionClient
private var actorSystem: Option[ActorSystem] = None
- private var server: Option[StandaloneRestServer] = None
+ private var server: Option[RestSubmissionServer] = None
override def afterEach() {
actorSystem.foreach(_.shutdown())
@@ -89,7 +89,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
conf.set("spark.app.name", "dreamer")
val appArgs = Array("one", "two", "six")
// main method calls this
- val response = StandaloneRestClient.run("app-resource", "main-class", appArgs, conf)
+ val response = RestSubmissionClient.run("app-resource", "main-class", appArgs, conf)
val submitResponse = getSubmitResponse(response)
assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -208,7 +208,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("good request paths") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val json = constructSubmitRequest(masterUrl).toJson
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill"
@@ -238,7 +238,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("good request paths, bad requests") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill"
val statusRequestPath = s"$httpUrl/$v/submissions/status"
@@ -276,7 +276,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("bad request paths") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val (response1, code1) = sendHttpRequestWithResponse(httpUrl, "GET")
val (response2, code2) = sendHttpRequestWithResponse(s"$httpUrl/", "GET")
val (response3, code3) = sendHttpRequestWithResponse(s"$httpUrl/$v", "GET")
@@ -292,7 +292,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
- assert(code8 === StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
+ assert(code8 === RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
// all responses should be error responses
val errorResponse1 = getErrorResponse(response1)
val errorResponse2 = getErrorResponse(response2)
@@ -310,13 +310,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
assert(errorResponse5.highestProtocolVersion === null)
assert(errorResponse6.highestProtocolVersion === null)
assert(errorResponse7.highestProtocolVersion === null)
- assert(errorResponse8.highestProtocolVersion === StandaloneRestServer.PROTOCOL_VERSION)
+ assert(errorResponse8.highestProtocolVersion === RestSubmissionServer.PROTOCOL_VERSION)
}
test("server returns unknown fields") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val oldJson = constructSubmitRequest(masterUrl).toJson
val oldFields = parse(oldJson).asInstanceOf[JObject].obj
@@ -340,7 +340,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("client handles faulty server") {
val masterUrl = startFaultyServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill/anything"
val statusRequestPath = s"$httpUrl/$v/submissions/status/anything"
@@ -400,9 +400,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val fakeMasterRef = _actorSystem.actorOf(Props(makeFakeMaster))
val _server =
if (faulty) {
- new FaultyStandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
+ new FaultyStandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
} else {
- new StandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
+ new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
}
val port = _server.start()
// set these to clean them up after every test
@@ -563,20 +563,18 @@ private class SmarterMaster extends Actor {
private class FaultyStandaloneRestServer(
host: String,
requestedPort: Int,
+ masterConf: SparkConf,
masterActor: ActorRef,
- masterUrl: String,
- masterConf: SparkConf)
- extends StandaloneRestServer(host, requestedPort, masterActor, masterUrl, masterConf) {
+ masterUrl: String)
+ extends RestSubmissionServer(host, requestedPort, masterConf) {
- protected override val contextToServlet = Map[String, StandaloneRestServlet](
- s"$baseContext/create/*" -> new MalformedSubmitServlet,
- s"$baseContext/kill/*" -> new InvalidKillServlet,
- s"$baseContext/status/*" -> new ExplodingStatusServlet,
- "/*" -> new ErrorServlet
- )
+ protected override val submitRequestServlet = new MalformedSubmitServlet
+ protected override val killRequestServlet = new InvalidKillServlet
+ protected override val statusRequestServlet = new ExplodingStatusServlet
/** A faulty servlet that produces malformed responses. */
- class MalformedSubmitServlet extends SubmitRequestServlet(masterActor, masterUrl, masterConf) {
+ class MalformedSubmitServlet
+ extends StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf) {
protected override def sendResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): Unit = {
@@ -586,7 +584,7 @@ private class FaultyStandaloneRestServer(
}
/** A faulty servlet that produces invalid responses. */
- class InvalidKillServlet extends KillRequestServlet(masterActor, masterConf) {
+ class InvalidKillServlet extends StandaloneKillRequestServlet(masterActor, masterConf) {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = super.handleKill(submissionId)
k.submissionId = null
@@ -595,7 +593,7 @@ private class FaultyStandaloneRestServer(
}
/** A faulty status servlet that explodes. */
- class ExplodingStatusServlet extends StatusRequestServlet(masterActor, masterConf) {
+ class ExplodingStatusServlet extends StandaloneStatusRequestServlet(masterActor, masterConf) {
private def explode: Int = 1 / 0
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
val s = super.handleStatus(submissionId)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000000..f28e29e9b8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.mesos
+
+import java.util.Date
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.{LocalSparkContext, SparkConf}
+
+
+class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with MockitoSugar {
+
+ private val command = new Command("mainClass", Seq("arg"), null, null, null, null)
+
+ test("can queue drivers") {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ val scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val response2 =
+ scheduler.submitDriver(new MesosDriverDescription(
+ "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+ assert(response2.success)
+ val state = scheduler.getSchedulerState()
+ val queuedDrivers = state.queuedDrivers.toList
+ assert(queuedDrivers(0).submissionId == response.submissionId)
+ assert(queuedDrivers(1).submissionId == response2.submissionId)
+ }
+
+ test("can kill queued drivers") {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ val scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+ val state = scheduler.getSchedulerState()
+ assert(state.queuedDrivers.isEmpty)
+ }
+}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 594bf78b67..8f53d8201a 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -78,6 +78,9 @@ To verify that the Mesos cluster is ready for Spark, navigate to the Mesos maste
To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and
a Spark driver program configured to connect to Mesos.
+Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
+`spark.mesos.executor.home` (defaults to SPARK_HOME) to point to that location.
+
## Uploading Spark Package
When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
@@ -107,7 +110,11 @@ the `make-distribution.sh` script included in a Spark source tarball/checkout.
The Master URLs for Mesos are in the form `mesos://host:5050` for a single-master Mesos
cluster, or `mesos://zk://host:2181` for a multi-master Mesos cluster using ZooKeeper.
-The driver also needs some configuration in `spark-env.sh` to interact properly with Mesos:
+## Client Mode
+
+In client mode, a Spark Mesos framework is launched directly on the client machine and waits for the driver output.
+
+The driver needs some configuration in `spark-env.sh` to interact properly with Mesos:
1. In `spark-env.sh` set some environment variables:
* `export MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so>`. This path is typically
@@ -129,8 +136,7 @@ val sc = new SparkContext(conf)
{% endhighlight %}
(You can also use [`spark-submit`](submitting-applications.html) and configure `spark.executor.uri`
-in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file. Note
-that `spark-submit` currently only supports deploying the Spark driver in `client` mode for Mesos.)
+in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file.)
When running a shell, the `spark.executor.uri` parameter is inherited from `SPARK_EXECUTOR_URI`, so
it does not need to be redundantly passed in as a system property.
@@ -139,6 +145,17 @@ it does not need to be redundantly passed in as a system property.
./bin/spark-shell --master mesos://host:5050
{% endhighlight %}
+## Cluster mode
+
+Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client
+can find the results of the driver from the Mesos Web UI.
+
+To use cluster mode, you must start the MesosClusterDispatcher in your cluster via the `sbin/start-mesos-dispatcher.sh` script,
+passing in the Mesos master url (e.g: mesos://host:5050).
+
+From the client, you can submit a job to Mesos cluster by running `spark-submit` and specifying the master url
+to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
+Spark cluster Web UI.
# Mesos Run Modes
diff --git a/sbin/start-mesos-dispatcher.sh b/sbin/start-mesos-dispatcher.sh
new file mode 100755
index 0000000000..ef1fc573d5
--- /dev/null
+++ b/sbin/start-mesos-dispatcher.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Starts the Mesos Cluster Dispatcher on the machine this script is executed on.
+# The Mesos Cluster Dispatcher is responsible for launching the Mesos framework and
+# Rest server to handle driver requests for Mesos cluster mode.
+# Only one cluster dispatcher is needed per Mesos cluster.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+if [ "$SPARK_MESOS_DISPATCHER_PORT" = "" ]; then
+ SPARK_MESOS_DISPATCHER_PORT=7077
+fi
+
+if [ "$SPARK_MESOS_DISPATCHER_HOST" = "" ]; then
+ SPARK_MESOS_DISPATCHER_HOST=`hostname`
+fi
+
+
+"$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosClusterDispatcher 1 --host $SPARK_MESOS_DISPATCHER_HOST --port $SPARK_MESOS_DISPATCHER_PORT "$@"
diff --git a/sbin/stop-mesos-dispatcher.sh b/sbin/stop-mesos-dispatcher.sh
new file mode 100755
index 0000000000..cb65d95b5e
--- /dev/null
+++ b/sbin/stop-mesos-dispatcher.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Stop the Mesos Cluster dispatcher on the machine this script is executed on.
+
+sbin=`dirname "$0"`
+sbin=`cd "$sbin"; pwd`
+
+. "$sbin/spark-config.sh"
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosClusterDispatcher 1
+