aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorWangTaoTheTonic <wangtao111@huawei.com>2015-05-01 18:38:20 -0700
committerAndrew Or <andrew@databricks.com>2015-05-01 18:38:20 -0700
commitb4b43df8a338a30c0eadcf10cbe3ba203dc3f861 (patch)
tree1cce6e72d9c907398ef6ffb89986ae03c8ba729b /core/src
parent2022193412e832393a29b94609841c3ffe8e3d66 (diff)
downloadspark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.tar.gz
spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.tar.bz2
spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.zip
[SPARK-6443] [SPARK SUBMIT] Could not submit app in standalone cluster mode when HA is enabled
**3/26 update:** * Akka-based: Use an array of `ActorSelection` to represent multiple master. Add an `activeMasterActor` for query status of driver. And will add lost masters( including the standby one) to `lostMasters`. When size of `lostMasters` equals or greater than # of all masters, we should give an error that all masters are not avalible. * Rest-based: When all masters are not available(throw an exception), we use akka gateway to submit apps. I have tested simply on standalone HA cluster(with two masters alive and one alive/one dead), it worked. There might remains some issues on style or message print, but we can check the solution then fix them together. /cc srowen andrewor14 Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #5116 from WangTaoTheTonic/SPARK-6443 and squashes the following commits: 2a28aab [WangTaoTheTonic] based the newest change https://github.com/apache/spark/pull/5144 76fd411 [WangTaoTheTonic] rebase f4f972b [WangTaoTheTonic] rebase...again a41de0b [WangTaoTheTonic] rebase 220cb3c [WangTaoTheTonic] move connect exception inside 35119a0 [WangTaoTheTonic] style and compile issues 9d636be [WangTaoTheTonic] per Andrew's comments 979760c [WangTaoTheTonic] rebase e4f4ece [WangTaoTheTonic] fix failed test 5d23958 [WangTaoTheTonic] refact some duplicated code, style and comments 7a881b3 [WangTaoTheTonic] when one of masters is gone, we still can submit 2b011c9 [WangTaoTheTonic] fix broken tests 60d97a4 [WangTaoTheTonic] rebase fa1fa80 [WangTaoTheTonic] submit app to HA cluster in standalone cluster mode
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala152
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala40
8 files changed, 229 insertions, 95 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c2c3e9a9e4..848b62f9de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy
+import scala.collection.mutable.HashSet
import scala.concurrent._
import akka.actor._
@@ -31,21 +32,24 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
/**
* Proxy that relays messages to the driver.
+ *
+ * We currently don't support retry if submission fails. In HA mode, client will submit request to
+ * all masters and see which one could handle it.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {
- var masterActor: ActorSelection = _
+ private val masterActors = driverArgs.masters.map { m =>
+ context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
+ }
+ private val lostMasters = new HashSet[Address]
+ private var activeMasterActor: ActorSelection = null
+
val timeout = RpcUtils.askTimeout(conf)
override def preStart(): Unit = {
- masterActor = context.actorSelection(
- Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
-
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
-
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
@@ -79,11 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
driverArgs.supervise,
command)
- masterActor ! RequestSubmitDriver(driverDescription)
+ // This assumes only one Master is active at a time
+ for (masterActor <- masterActors) {
+ masterActor ! RequestSubmitDriver(driverDescription)
+ }
case "kill" =>
val driverId = driverArgs.driverId
- masterActor ! RequestKillDriver(driverId)
+ // This assumes only one Master is active at a time
+ for (masterActor <- masterActors) {
+ masterActor ! RequestKillDriver(driverId)
+ }
}
}
@@ -92,10 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
- val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
+ val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
-
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
@@ -122,20 +131,46 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
case SubmitDriverResponse(success, driverId, message) =>
println(message)
- if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
+ if (success) {
+ activeMasterActor = context.actorSelection(sender.path)
+ pollAndReportStatus(driverId.get)
+ } else if (!Utils.responseFromBackup(message)) {
+ System.exit(-1)
+ }
+
case KillDriverResponse(driverId, success, message) =>
println(message)
- if (success) pollAndReportStatus(driverId) else System.exit(-1)
+ if (success) {
+ activeMasterActor = context.actorSelection(sender.path)
+ pollAndReportStatus(driverId)
+ } else if (!Utils.responseFromBackup(message)) {
+ System.exit(-1)
+ }
case DisassociatedEvent(_, remoteAddress, _) =>
- println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
- System.exit(-1)
+ if (!lostMasters.contains(remoteAddress)) {
+ println(s"Error connecting to master $remoteAddress.")
+ lostMasters += remoteAddress
+ // Note that this heuristic does not account for the fact that a Master can recover within
+ // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
+ // is not currently a concern, however, because this client does not retry submissions.
+ if (lostMasters.size >= masterActors.size) {
+ println("No master is available, exiting.")
+ System.exit(-1)
+ }
+ }
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
- println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
- println(s"Cause was: $cause")
- System.exit(-1)
+ if (!lostMasters.contains(remoteAddress)) {
+ println(s"Error connecting to master ($remoteAddress).")
+ println(s"Cause was: $cause")
+ lostMasters += remoteAddress
+ if (lostMasters.size >= masterActors.size) {
+ println("No master is available, exiting.")
+ System.exit(-1)
+ }
+ }
}
}
@@ -163,7 +198,9 @@ object Client {
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
- Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
+ for (m <- driverArgs.masters) {
+ Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
+ }
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 5cbac787dc..316e2d59f0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -22,8 +22,7 @@ import java.net.{URI, URISyntaxException}
import scala.collection.mutable.ListBuffer
import org.apache.log4j.Level
-
-import org.apache.spark.util.{IntParam, MemoryParam}
+import org.apache.spark.util.{IntParam, MemoryParam, Utils}
/**
* Command-line parser for the driver client.
@@ -35,7 +34,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
var logLevel = Level.WARN
// launch parameters
- var master: String = ""
+ var masters: Array[String] = null
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = DEFAULT_SUPERVISE
@@ -80,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
}
jarUrl = _jarUrl
- master = _master
+ masters = Utils.parseStandaloneMasterUrls(_master)
mainClass = _mainClass
_driverOptions ++= tail
case "kill" :: _master :: _driverId :: tail =>
cmd = "kill"
- master = _master
+ masters = Utils.parseStandaloneMasterUrls(_master)
driverId = _driverId
case _ =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index af38bf80e4..42b5d41b7b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -118,8 +118,8 @@ object SparkSubmit {
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
*/
private def kill(args: SparkSubmitArguments): Unit = {
- new RestSubmissionClient()
- .killSubmission(args.master, args.submissionToKill)
+ new RestSubmissionClient(args.master)
+ .killSubmission(args.submissionToKill)
}
/**
@@ -127,8 +127,8 @@ object SparkSubmit {
* Standalone and Mesos cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
- new RestSubmissionClient()
- .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
+ new RestSubmissionClient(args.master)
+ .requestSubmissionStatus(args.submissionToRequestStatusFor)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dc6077f3d1..0fac3cdcf5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -254,7 +254,8 @@ private[master] class Master(
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
- val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
+ val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+ "Can only accept driver submissions in ALIVE state."
sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
@@ -274,7 +275,8 @@ private[master] class Master(
case RequestKillDriver(driverId) => {
if (state != RecoveryState.ALIVE) {
- val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
+ val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+ s"Can only kill drivers in ALIVE state."
sender ! KillDriverResponse(driverId, success = false, msg)
} else {
logInfo("Asked to kill driver " + driverId)
@@ -305,12 +307,18 @@ private[master] class Master(
}
case RequestDriverStatus(driverId) => {
- (drivers ++ completedDrivers).find(_.id == driverId) match {
- case Some(driver) =>
- sender ! DriverStatusResponse(found = true, Some(driver.state),
- driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
- case None =>
- sender ! DriverStatusResponse(found = false, None, None, None, None)
+ if (state != RecoveryState.ALIVE) {
+ val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+ "Can only request driver status in ALIVE state."
+ sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
+ } else {
+ (drivers ++ completedDrivers).find(_.id == driverId) match {
+ case Some(driver) =>
+ sender ! DriverStatusResponse(found = true, Some(driver.state),
+ driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
+ case None =>
+ sender ! DriverStatusResponse(found = false, None, None, None, None)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 307cebfb4b..6078f50518 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -18,9 +18,10 @@
package org.apache.spark.deploy.rest
import java.io.{DataOutputStream, FileNotFoundException}
-import java.net.{HttpURLConnection, SocketException, URL}
+import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
import javax.servlet.http.HttpServletResponse
+import scala.collection.mutable
import scala.io.Source
import com.fasterxml.jackson.core.JsonProcessingException
@@ -51,57 +52,109 @@ import org.apache.spark.util.Utils
* implementation of this client can use that information to retry using the version specified
* by the server.
*/
-private[spark] class RestSubmissionClient extends Logging {
+private[spark] class RestSubmissionClient(master: String) extends Logging {
import RestSubmissionClient._
private val supportedMasterPrefixes = Seq("spark://", "mesos://")
+ private val masters: Array[String] = Utils.parseStandaloneMasterUrls(master)
+
+ // Set of masters that lost contact with us, used to keep track of
+ // whether there are masters still alive for us to communicate with
+ private val lostMasters = new mutable.HashSet[String]
+
/**
* Submit an application specified by the parameters in the provided request.
*
* If the submission was successful, poll the status of the submission and report
* it to the user. Otherwise, report the error message provided by the server.
*/
- def createSubmission(
- master: String,
- request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
+ def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to launch an application in $master.")
- validateMaster(master)
- val url = getSubmitUrl(master)
- val response = postJson(url, request.toJson)
- response match {
- case s: CreateSubmissionResponse =>
- reportSubmissionStatus(master, s)
- handleRestResponse(s)
- case unexpected =>
- handleUnexpectedRestResponse(unexpected)
+ var handled: Boolean = false
+ var response: SubmitRestProtocolResponse = null
+ for (m <- masters if !handled) {
+ validateMaster(m)
+ val url = getSubmitUrl(m)
+ try {
+ response = postJson(url, request.toJson)
+ response match {
+ case s: CreateSubmissionResponse =>
+ if (s.success) {
+ reportSubmissionStatus(s)
+ handleRestResponse(s)
+ handled = true
+ }
+ case unexpected =>
+ handleUnexpectedRestResponse(unexpected)
+ }
+ } catch {
+ case e: SubmitRestConnectionException =>
+ if (handleConnectionException(m)) {
+ throw new SubmitRestConnectionException("Unable to connect to server", e)
+ }
+ }
}
response
}
/** Request that the server kill the specified submission. */
- def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
+ def killSubmission(submissionId: String): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to kill submission $submissionId in $master.")
- validateMaster(master)
- val response = post(getKillUrl(master, submissionId))
- response match {
- case k: KillSubmissionResponse => handleRestResponse(k)
- case unexpected => handleUnexpectedRestResponse(unexpected)
+ var handled: Boolean = false
+ var response: SubmitRestProtocolResponse = null
+ for (m <- masters if !handled) {
+ validateMaster(m)
+ val url = getKillUrl(m, submissionId)
+ try {
+ response = post(url)
+ response match {
+ case k: KillSubmissionResponse =>
+ if (!Utils.responseFromBackup(k.message)) {
+ handleRestResponse(k)
+ handled = true
+ }
+ case unexpected =>
+ handleUnexpectedRestResponse(unexpected)
+ }
+ } catch {
+ case e: SubmitRestConnectionException =>
+ if (handleConnectionException(m)) {
+ throw new SubmitRestConnectionException("Unable to connect to server", e)
+ }
+ }
}
response
}
/** Request the status of a submission from the server. */
def requestSubmissionStatus(
- master: String,
submissionId: String,
quiet: Boolean = false): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
- validateMaster(master)
- val response = get(getStatusUrl(master, submissionId))
- response match {
- case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
- case unexpected => handleUnexpectedRestResponse(unexpected)
+
+ var handled: Boolean = false
+ var response: SubmitRestProtocolResponse = null
+ for (m <- masters if !handled) {
+ validateMaster(m)
+ val url = getStatusUrl(m, submissionId)
+ try {
+ response = get(url)
+ response match {
+ case s: SubmissionStatusResponse if s.success =>
+ if (!quiet) {
+ handleRestResponse(s)
+ }
+ handled = true
+ case unexpected =>
+ handleUnexpectedRestResponse(unexpected)
+ }
+ } catch {
+ case e: SubmitRestConnectionException =>
+ if (handleConnectionException(m)) {
+ throw new SubmitRestConnectionException("Unable to connect to server", e)
+ }
+ }
}
response
}
@@ -148,11 +201,16 @@ private[spark] class RestSubmissionClient extends Logging {
conn.setRequestProperty("Content-Type", "application/json")
conn.setRequestProperty("charset", "utf-8")
conn.setDoOutput(true)
- val out = new DataOutputStream(conn.getOutputStream)
- Utils.tryWithSafeFinally {
- out.write(json.getBytes(Charsets.UTF_8))
- } {
- out.close()
+ try {
+ val out = new DataOutputStream(conn.getOutputStream)
+ Utils.tryWithSafeFinally {
+ out.write(json.getBytes(Charsets.UTF_8))
+ } {
+ out.close()
+ }
+ } catch {
+ case e: ConnectException =>
+ throw new SubmitRestConnectionException("Connect Exception when connect to server", e)
}
readResponse(conn)
}
@@ -191,11 +249,9 @@ private[spark] class RestSubmissionClient extends Logging {
}
} catch {
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
- throw new SubmitRestConnectionException(
- s"Unable to connect to server ${connection.getURL}", unreachable)
+ throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
- throw new SubmitRestProtocolException(
- "Malformed response received from server", malformed)
+ throw new SubmitRestProtocolException("Malformed response received from server", malformed)
}
}
@@ -241,13 +297,12 @@ private[spark] class RestSubmissionClient extends Logging {
/** Report the status of a newly created submission. */
private def reportSubmissionStatus(
- master: String,
submitResponse: CreateSubmissionResponse): Unit = {
if (submitResponse.success) {
val submissionId = submitResponse.submissionId
if (submissionId != null) {
logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
- pollSubmissionStatus(master, submissionId)
+ pollSubmissionStatus(submissionId)
} else {
// should never happen
logError("Application successfully submitted, but submission ID was not provided!")
@@ -262,9 +317,9 @@ private[spark] class RestSubmissionClient extends Logging {
* Poll the status of the specified submission and log it.
* This retries up to a fixed number of times before giving up.
*/
- private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
+ private def pollSubmissionStatus(submissionId: String): Unit = {
(1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
- val response = requestSubmissionStatus(master, submissionId, quiet = true)
+ val response = requestSubmissionStatus(submissionId, quiet = true)
val statusResponse = response match {
case s: SubmissionStatusResponse => s
case _ => return // unexpected type, let upstream caller handle it
@@ -302,6 +357,21 @@ private[spark] class RestSubmissionClient extends Logging {
private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
}
+
+ /**
+ * When a connection exception is caught, return true if all masters are lost.
+ * Note that the heuristic used here does not take into account that masters
+ * can recover during the lifetime of this client. This assumption should be
+ * harmless because this client currently does not support retrying submission
+ * on failure yet (SPARK-6443).
+ */
+ private def handleConnectionException(masterUrl: String): Boolean = {
+ if (!lostMasters.contains(masterUrl)) {
+ logWarning(s"Unable to connect to server ${masterUrl}.")
+ lostMasters += masterUrl
+ }
+ lostMasters.size >= masters.size
+ }
}
private[spark] object RestSubmissionClient {
@@ -324,10 +394,10 @@ private[spark] object RestSubmissionClient {
}
val sparkProperties = conf.getAll.toMap
val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
- val client = new RestSubmissionClient
+ val client = new RestSubmissionClient(master)
val submitRequest = client.constructSubmitRequest(
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
- client.createSubmission(master, submitRequest)
+ client.createSubmission(submitRequest)
}
def main(args: Array[String]): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 88f9d880ac..9678631da9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (masters != null) { // Two positional arguments were given
printUsageAndExit(1)
}
- masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
+ masters = Utils.parseStandaloneMasterUrls(value)
parse(tail)
case Nil =>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 844f0cd22d..be4db02ab8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2160,6 +2160,22 @@ private[spark] object Utils extends Logging {
}
/**
+ * Split the comma delimited string of master URLs into a list.
+ * For instance, "spark://abc,def" becomes [spark://abc, spark://def].
+ */
+ def parseStandaloneMasterUrls(masterUrls: String): Array[String] = {
+ masterUrls.stripPrefix("spark://").split(",").map("spark://" + _)
+ }
+
+ /** An identifier that backup masters use in their responses. */
+ val BACKUP_STANDALONE_MASTER_PREFIX = "Current state is not alive"
+
+ /** Return true if the response message is sent from a backup Master on standby. */
+ def responseFromBackup(msg: String): Boolean = {
+ msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX)
+ }
+
+ /**
* Adds a shutdown hook with default priority.
*
* @param hook The code to run during shutdown.
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 0a318a27ac..f4d548d9e7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._
* Tests for the REST application submission protocol used in standalone cluster mode.
*/
class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
- private val client = new RestSubmissionClient
private var actorSystem: Option[ActorSystem] = None
private var server: Option[RestSubmissionServer] = None
@@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val appArgs = Array("one", "two", "three")
val sparkProperties = Map("spark.app.name" -> "pi")
val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX")
- val request = client.constructSubmitRequest(
+ val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest(
"my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables)
assert(request.action === Utils.getFormattedClassName(request))
assert(request.clientSparkVersion === SPARK_VERSION)
@@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val request = constructSubmitRequest(masterUrl, appArgs)
assert(request.appArgs === appArgs)
assert(request.sparkProperties("spark.master") === masterUrl)
- val response = client.createSubmission(masterUrl, request)
+ val response = new RestSubmissionClient(masterUrl).createSubmission(request)
val submitResponse = getSubmitResponse(response)
assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val submissionId = "my-lyft-driver"
val killMessage = "your driver is killed"
val masterUrl = startDummyServer(killMessage = killMessage)
- val response = client.killSubmission(masterUrl, submissionId)
+ val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId)
val killResponse = getKillResponse(response)
assert(killResponse.action === Utils.getFormattedClassName(killResponse))
assert(killResponse.serverSparkVersion === SPARK_VERSION)
@@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val submissionState = KILLED
val submissionException = new Exception("there was an irresponsible mix of alcohol and cars")
val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException))
- val response = client.requestSubmissionStatus(masterUrl, submissionId)
+ val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId)
val statusResponse = getStatusResponse(response)
assert(statusResponse.action === Utils.getFormattedClassName(statusResponse))
assert(statusResponse.serverSparkVersion === SPARK_VERSION)
@@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("create then kill") {
val masterUrl = startSmartServer()
val request = constructSubmitRequest(masterUrl)
- val response1 = client.createSubmission(masterUrl, request)
+ val client = new RestSubmissionClient(masterUrl)
+ val response1 = client.createSubmission(request)
val submitResponse = getSubmitResponse(response1)
assert(submitResponse.success)
assert(submitResponse.submissionId != null)
// kill submission that was just created
val submissionId = submitResponse.submissionId
- val response2 = client.killSubmission(masterUrl, submissionId)
+ val response2 = client.killSubmission(submissionId)
val killResponse = getKillResponse(response2)
assert(killResponse.success)
assert(killResponse.submissionId === submissionId)
@@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("create then request status") {
val masterUrl = startSmartServer()
val request = constructSubmitRequest(masterUrl)
- val response1 = client.createSubmission(masterUrl, request)
+ val client = new RestSubmissionClient(masterUrl)
+ val response1 = client.createSubmission(request)
val submitResponse = getSubmitResponse(response1)
assert(submitResponse.success)
assert(submitResponse.submissionId != null)
// request status of submission that was just created
val submissionId = submitResponse.submissionId
- val response2 = client.requestSubmissionStatus(masterUrl, submissionId)
+ val response2 = client.requestSubmissionStatus(submissionId)
val statusResponse = getStatusResponse(response2)
assert(statusResponse.success)
assert(statusResponse.submissionId === submissionId)
@@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("create then kill then request status") {
val masterUrl = startSmartServer()
val request = constructSubmitRequest(masterUrl)
- val response1 = client.createSubmission(masterUrl, request)
- val response2 = client.createSubmission(masterUrl, request)
+ val client = new RestSubmissionClient(masterUrl)
+ val response1 = client.createSubmission(request)
+ val response2 = client.createSubmission(request)
val submitResponse1 = getSubmitResponse(response1)
val submitResponse2 = getSubmitResponse(response2)
assert(submitResponse1.success)
@@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val submissionId1 = submitResponse1.submissionId
val submissionId2 = submitResponse2.submissionId
// kill only submission 1, but not submission 2
- val response3 = client.killSubmission(masterUrl, submissionId1)
+ val response3 = client.killSubmission(submissionId1)
val killResponse = getKillResponse(response3)
assert(killResponse.success)
assert(killResponse.submissionId === submissionId1)
// request status for both submissions: 1 should be KILLED but 2 should be RUNNING still
- val response4 = client.requestSubmissionStatus(masterUrl, submissionId1)
- val response5 = client.requestSubmissionStatus(masterUrl, submissionId2)
+ val response4 = client.requestSubmissionStatus(submissionId1)
+ val response5 = client.requestSubmissionStatus(submissionId2)
val statusResponse1 = getStatusResponse(response4)
val statusResponse2 = getStatusResponse(response5)
assert(statusResponse1.submissionId === submissionId1)
@@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("kill or request status before create") {
val masterUrl = startSmartServer()
val doesNotExist = "does-not-exist"
+ val client = new RestSubmissionClient(masterUrl)
// kill a non-existent submission
- val response1 = client.killSubmission(masterUrl, doesNotExist)
+ val response1 = client.killSubmission(doesNotExist)
val killResponse = getKillResponse(response1)
assert(!killResponse.success)
assert(killResponse.submissionId === doesNotExist)
// request status for a non-existent submission
- val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist)
+ val response2 = client.requestSubmissionStatus(doesNotExist)
val statusResponse = getStatusResponse(response2)
assert(!statusResponse.success)
assert(statusResponse.submissionId === doesNotExist)
@@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("client handles faulty server") {
val masterUrl = startFaultyServer()
+ val client = new RestSubmissionClient(masterUrl)
val httpUrl = masterUrl.replace("spark://", "http://")
val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
@@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
mainJar) ++ appArgs
val args = new SparkSubmitArguments(commandLineArgs)
val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
- client.constructSubmitRequest(
+ new RestSubmissionClient("spark://host:port").constructSubmitRequest(
mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty)
}
@@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
method: String,
body: String = ""): (SubmitRestProtocolResponse, Int) = {
val conn = sendHttpRequest(url, method, body)
- (client.readResponse(conn), conn.getResponseCode)
+ (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode)
}
}