aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-04-28 13:31:08 -0700
committerAndrew Or <andrew@databricks.com>2015-04-28 13:33:57 -0700
commit53befacced828bbac53c6e3a4976ec3f036bae9e (patch)
treeedd8acc9ff5b60698dd579fd07b35ae977d83cfe /core/src/test
parent80098109d908b738b43d397e024756ff617d0af4 (diff)
downloadspark-53befacced828bbac53c6e3a4976ec3f036bae9e.tar.gz
spark-53befacced828bbac53c6e3a4976ec3f036bae9e.tar.bz2
spark-53befacced828bbac53c6e3a4976ec3f036bae9e.zip
[SPARK-5338] [MESOS] Add cluster mode support for Mesos
This patch adds the support for cluster mode to run on Mesos. It introduces a new Mesos framework dedicated to launch new apps/drivers, and can be called with the spark-submit script and specifying --master flag to the cluster mode REST interface instead of Mesos master. Example: ./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master mesos://10.0.0.206:8077 --executor-memory 1G --total-executor-cores 100 examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar 30 Part of this patch is also to abstract the StandaloneRestServer so it can have different implementations of the REST endpoints. Features of the cluster mode in this PR: - Supports supervise mode where scheduler will keep trying to reschedule exited job. - Adds a new UI for the cluster mode scheduler to see all the running jobs, finished jobs, and supervise jobs waiting to be retried - Supports state persistence to ZK, so when the cluster scheduler fails over it can pick up all the queued and running jobs Author: Timothy Chen <tnachen@gmail.com> Author: Luc Bourlier <luc.bourlier@typesafe.com> Closes #5144 from tnachen/mesos_cluster_mode and squashes the following commits: 069e946 [Timothy Chen] Fix rebase. e24b512 [Timothy Chen] Persist submitted driver. 390c491 [Timothy Chen] Fix zk conf key for mesos zk engine. e324ac1 [Timothy Chen] Fix merge. fd5259d [Timothy Chen] Address review comments. 1553230 [Timothy Chen] Address review comments. c6c6b73 [Timothy Chen] Pass spark properties to mesos cluster tasks. f7d8046 [Timothy Chen] Change app name to spark cluster. 17f93a2 [Timothy Chen] Fix head of line blocking in scheduling drivers. 6ff8e5c [Timothy Chen] Address comments and add logging. df355cd [Timothy Chen] Add metrics to mesos cluster scheduler. 20f7284 [Timothy Chen] Address review comments 7252612 [Timothy Chen] Fix tests. a46ad66 [Timothy Chen] Allow zk cli param override. 920fc4b [Timothy Chen] Fix scala style issues. 862b5b5 [Timothy Chen] Support asking driver status when it's retrying. 7f214c2 [Timothy Chen] Fix RetryState visibility e0f33f7 [Timothy Chen] Add supervise support and persist retries. 371ce65 [Timothy Chen] Handle cluster mode recovery and state persistence. 3d4dfa1 [Luc Bourlier] Adds support to kill submissions febfaba [Timothy Chen] Bound the finished drivers in memory 543a98d [Timothy Chen] Schedule multiple jobs 6887e5e [Timothy Chen] Support looking at SPARK_EXECUTOR_URI env variable in schedulers 8ec76bc [Timothy Chen] Fix Mesos dispatcher UI. d57d77d [Timothy Chen] Add documentation 825afa0 [Luc Bourlier] Supports more spark-submit parameters b8e7181 [Luc Bourlier] Adds a shutdown latch to keep the deamon running 0fa7780 [Luc Bourlier] Launch task through the mesos scheduler 5b7a12b [Timothy Chen] WIP: Making a cluster mode a mesos framework. 4b2f5ef [Timothy Chen] Specify user jar in command to be replaced with local. e775001 [Timothy Chen] Support fetching remote uris in driver runner. 7179495 [Timothy Chen] Change Driver page output and add logging 880bc27 [Timothy Chen] Add Mesos Cluster UI to display driver results 9986731 [Timothy Chen] Kill drivers when shutdown 67cbc18 [Timothy Chen] Rename StandaloneRestClient to RestClient and add sbin scripts e3facdd [Timothy Chen] Add Mesos Cluster dispatcher
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala76
3 files changed, 99 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4561e5b8e9..c4e6f06146 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -231,7 +231,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val childArgsStr = childArgs.mkString(" ")
if (useRest) {
childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
- mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient")
+ mainClass should be ("org.apache.spark.deploy.rest.RestSubmissionClient")
} else {
childArgsStr should startWith ("--supervise --memory 4g --cores 5")
childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2"
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 8e09976636..0a318a27ac 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,9 +39,9 @@ import org.apache.spark.deploy.master.DriverState._
* Tests for the REST application submission protocol used in standalone cluster mode.
*/
class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
- private val client = new StandaloneRestClient
+ private val client = new RestSubmissionClient
private var actorSystem: Option[ActorSystem] = None
- private var server: Option[StandaloneRestServer] = None
+ private var server: Option[RestSubmissionServer] = None
override def afterEach() {
actorSystem.foreach(_.shutdown())
@@ -89,7 +89,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
conf.set("spark.app.name", "dreamer")
val appArgs = Array("one", "two", "six")
// main method calls this
- val response = StandaloneRestClient.run("app-resource", "main-class", appArgs, conf)
+ val response = RestSubmissionClient.run("app-resource", "main-class", appArgs, conf)
val submitResponse = getSubmitResponse(response)
assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -208,7 +208,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("good request paths") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val json = constructSubmitRequest(masterUrl).toJson
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill"
@@ -238,7 +238,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("good request paths, bad requests") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill"
val statusRequestPath = s"$httpUrl/$v/submissions/status"
@@ -276,7 +276,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("bad request paths") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val (response1, code1) = sendHttpRequestWithResponse(httpUrl, "GET")
val (response2, code2) = sendHttpRequestWithResponse(s"$httpUrl/", "GET")
val (response3, code3) = sendHttpRequestWithResponse(s"$httpUrl/$v", "GET")
@@ -292,7 +292,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
- assert(code8 === StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
+ assert(code8 === RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
// all responses should be error responses
val errorResponse1 = getErrorResponse(response1)
val errorResponse2 = getErrorResponse(response2)
@@ -310,13 +310,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
assert(errorResponse5.highestProtocolVersion === null)
assert(errorResponse6.highestProtocolVersion === null)
assert(errorResponse7.highestProtocolVersion === null)
- assert(errorResponse8.highestProtocolVersion === StandaloneRestServer.PROTOCOL_VERSION)
+ assert(errorResponse8.highestProtocolVersion === RestSubmissionServer.PROTOCOL_VERSION)
}
test("server returns unknown fields") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val oldJson = constructSubmitRequest(masterUrl).toJson
val oldFields = parse(oldJson).asInstanceOf[JObject].obj
@@ -340,7 +340,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("client handles faulty server") {
val masterUrl = startFaultyServer()
val httpUrl = masterUrl.replace("spark://", "http://")
- val v = StandaloneRestServer.PROTOCOL_VERSION
+ val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill/anything"
val statusRequestPath = s"$httpUrl/$v/submissions/status/anything"
@@ -400,9 +400,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val fakeMasterRef = _actorSystem.actorOf(Props(makeFakeMaster))
val _server =
if (faulty) {
- new FaultyStandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
+ new FaultyStandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
} else {
- new StandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
+ new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
}
val port = _server.start()
// set these to clean them up after every test
@@ -563,20 +563,18 @@ private class SmarterMaster extends Actor {
private class FaultyStandaloneRestServer(
host: String,
requestedPort: Int,
+ masterConf: SparkConf,
masterActor: ActorRef,
- masterUrl: String,
- masterConf: SparkConf)
- extends StandaloneRestServer(host, requestedPort, masterActor, masterUrl, masterConf) {
+ masterUrl: String)
+ extends RestSubmissionServer(host, requestedPort, masterConf) {
- protected override val contextToServlet = Map[String, StandaloneRestServlet](
- s"$baseContext/create/*" -> new MalformedSubmitServlet,
- s"$baseContext/kill/*" -> new InvalidKillServlet,
- s"$baseContext/status/*" -> new ExplodingStatusServlet,
- "/*" -> new ErrorServlet
- )
+ protected override val submitRequestServlet = new MalformedSubmitServlet
+ protected override val killRequestServlet = new InvalidKillServlet
+ protected override val statusRequestServlet = new ExplodingStatusServlet
/** A faulty servlet that produces malformed responses. */
- class MalformedSubmitServlet extends SubmitRequestServlet(masterActor, masterUrl, masterConf) {
+ class MalformedSubmitServlet
+ extends StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf) {
protected override def sendResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): Unit = {
@@ -586,7 +584,7 @@ private class FaultyStandaloneRestServer(
}
/** A faulty servlet that produces invalid responses. */
- class InvalidKillServlet extends KillRequestServlet(masterActor, masterConf) {
+ class InvalidKillServlet extends StandaloneKillRequestServlet(masterActor, masterConf) {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = super.handleKill(submissionId)
k.submissionId = null
@@ -595,7 +593,7 @@ private class FaultyStandaloneRestServer(
}
/** A faulty status servlet that explodes. */
- class ExplodingStatusServlet extends StatusRequestServlet(masterActor, masterConf) {
+ class ExplodingStatusServlet extends StandaloneStatusRequestServlet(masterActor, masterConf) {
private def explode: Int = 1 / 0
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
val s = super.handleStatus(submissionId)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000000..f28e29e9b8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.mesos
+
+import java.util.Date
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.{LocalSparkContext, SparkConf}
+
+
+class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with MockitoSugar {
+
+ private val command = new Command("mainClass", Seq("arg"), null, null, null, null)
+
+ test("can queue drivers") {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ val scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val response2 =
+ scheduler.submitDriver(new MesosDriverDescription(
+ "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+ assert(response2.success)
+ val state = scheduler.getSchedulerState()
+ val queuedDrivers = state.queuedDrivers.toList
+ assert(queuedDrivers(0).submissionId == response.submissionId)
+ assert(queuedDrivers(1).submissionId == response2.submissionId)
+ }
+
+ test("can kill queued drivers") {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ val scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+ val state = scheduler.getSchedulerState()
+ assert(state.queuedDrivers.isEmpty)
+ }
+}