From b4b43df8a338a30c0eadcf10cbe3ba203dc3f861 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 1 May 2015 18:38:20 -0700 Subject: [SPARK-6443] [SPARK SUBMIT] Could not submit app in standalone cluster mode when HA is enabled **3/26 update:** * Akka-based: Use an array of `ActorSelection` to represent multiple master. Add an `activeMasterActor` for query status of driver. And will add lost masters( including the standby one) to `lostMasters`. When size of `lostMasters` equals or greater than # of all masters, we should give an error that all masters are not avalible. * Rest-based: When all masters are not available(throw an exception), we use akka gateway to submit apps. I have tested simply on standalone HA cluster(with two masters alive and one alive/one dead), it worked. There might remains some issues on style or message print, but we can check the solution then fix them together. /cc srowen andrewor14 Author: WangTaoTheTonic Closes #5116 from WangTaoTheTonic/SPARK-6443 and squashes the following commits: 2a28aab [WangTaoTheTonic] based the newest change https://github.com/apache/spark/pull/5144 76fd411 [WangTaoTheTonic] rebase f4f972b [WangTaoTheTonic] rebase...again a41de0b [WangTaoTheTonic] rebase 220cb3c [WangTaoTheTonic] move connect exception inside 35119a0 [WangTaoTheTonic] style and compile issues 9d636be [WangTaoTheTonic] per Andrew's comments 979760c [WangTaoTheTonic] rebase e4f4ece [WangTaoTheTonic] fix failed test 5d23958 [WangTaoTheTonic] refact some duplicated code, style and comments 7a881b3 [WangTaoTheTonic] when one of masters is gone, we still can submit 2b011c9 [WangTaoTheTonic] fix broken tests 60d97a4 [WangTaoTheTonic] rebase fa1fa80 [WangTaoTheTonic] submit app to HA cluster in standalone cluster mode --- .../scala/org/apache/spark/deploy/Client.scala | 73 +++++++--- .../org/apache/spark/deploy/ClientArguments.scala | 9 +- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +- .../org/apache/spark/deploy/master/Master.scala | 24 ++-- .../spark/deploy/rest/RestSubmissionClient.scala | 152 +++++++++++++++------ .../spark/deploy/worker/WorkerArguments.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 16 +++ .../deploy/rest/StandaloneRestSubmitSuite.scala | 40 +++--- 8 files changed, 229 insertions(+), 95 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index c2c3e9a9e4..848b62f9de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy +import scala.collection.mutable.HashSet import scala.concurrent._ import akka.actor._ @@ -31,21 +32,24 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils} /** * Proxy that relays messages to the driver. + * + * We currently don't support retry if submission fails. In HA mode, client will submit request to + * all masters and see which one could handle it. */ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { - var masterActor: ActorSelection = _ + private val masterActors = driverArgs.masters.map { m => + context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system))) + } + private val lostMasters = new HashSet[Address] + private var activeMasterActor: ActorSelection = null + val timeout = RpcUtils.askTimeout(conf) override def preStart(): Unit = { - masterActor = context.actorSelection( - Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system))) - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") - driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would @@ -79,11 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.supervise, command) - masterActor ! RequestSubmitDriver(driverDescription) + // This assumes only one Master is active at a time + for (masterActor <- masterActors) { + masterActor ! RequestSubmitDriver(driverDescription) + } case "kill" => val driverId = driverArgs.driverId - masterActor ! RequestKillDriver(driverId) + // This assumes only one Master is active at a time + for (masterActor <- masterActors) { + masterActor ! RequestKillDriver(driverId) + } } } @@ -92,10 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) println("... waiting before polling master for driver state") Thread.sleep(5000) println("... polling master for driver state") - val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout) + val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout) .mapTo[DriverStatusResponse] val statusResponse = Await.result(statusFuture, timeout) - statusResponse.found match { case false => println(s"ERROR: Cluster master did not recognize $driverId") @@ -122,20 +131,46 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) case SubmitDriverResponse(success, driverId, message) => println(message) - if (success) pollAndReportStatus(driverId.get) else System.exit(-1) + if (success) { + activeMasterActor = context.actorSelection(sender.path) + pollAndReportStatus(driverId.get) + } else if (!Utils.responseFromBackup(message)) { + System.exit(-1) + } + case KillDriverResponse(driverId, success, message) => println(message) - if (success) pollAndReportStatus(driverId) else System.exit(-1) + if (success) { + activeMasterActor = context.actorSelection(sender.path) + pollAndReportStatus(driverId) + } else if (!Utils.responseFromBackup(message)) { + System.exit(-1) + } case DisassociatedEvent(_, remoteAddress, _) => - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") - System.exit(-1) + if (!lostMasters.contains(remoteAddress)) { + println(s"Error connecting to master $remoteAddress.") + lostMasters += remoteAddress + // Note that this heuristic does not account for the fact that a Master can recover within + // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This + // is not currently a concern, however, because this client does not retry submissions. + if (lostMasters.size >= masterActors.size) { + println("No master is available, exiting.") + System.exit(-1) + } + } case AssociationErrorEvent(cause, _, remoteAddress, _, _) => - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") - println(s"Cause was: $cause") - System.exit(-1) + if (!lostMasters.contains(remoteAddress)) { + println(s"Error connecting to master ($remoteAddress).") + println(s"Cause was: $cause") + lostMasters += remoteAddress + if (lostMasters.size >= masterActors.size) { + println("No master is available, exiting.") + System.exit(-1) + } + } } } @@ -163,7 +198,9 @@ object Client { "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely - Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem)) + for (m <- driverArgs.masters) { + Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem)) + } actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 5cbac787dc..316e2d59f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -22,8 +22,7 @@ import java.net.{URI, URISyntaxException} import scala.collection.mutable.ListBuffer import org.apache.log4j.Level - -import org.apache.spark.util.{IntParam, MemoryParam} +import org.apache.spark.util.{IntParam, MemoryParam, Utils} /** * Command-line parser for the driver client. @@ -35,7 +34,7 @@ private[deploy] class ClientArguments(args: Array[String]) { var logLevel = Level.WARN // launch parameters - var master: String = "" + var masters: Array[String] = null var jarUrl: String = "" var mainClass: String = "" var supervise: Boolean = DEFAULT_SUPERVISE @@ -80,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) { } jarUrl = _jarUrl - master = _master + masters = Utils.parseStandaloneMasterUrls(_master) mainClass = _mainClass _driverOptions ++= tail case "kill" :: _master :: _driverId :: tail => cmd = "kill" - master = _master + masters = Utils.parseStandaloneMasterUrls(_master) driverId = _driverId case _ => diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index af38bf80e4..42b5d41b7b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -118,8 +118,8 @@ object SparkSubmit { * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only. */ private def kill(args: SparkSubmitArguments): Unit = { - new RestSubmissionClient() - .killSubmission(args.master, args.submissionToKill) + new RestSubmissionClient(args.master) + .killSubmission(args.submissionToKill) } /** @@ -127,8 +127,8 @@ object SparkSubmit { * Standalone and Mesos cluster mode only. */ private def requestStatus(args: SparkSubmitArguments): Unit = { - new RestSubmissionClient() - .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor) + new RestSubmissionClient(args.master) + .requestSubmissionStatus(args.submissionToRequestStatusFor) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index dc6077f3d1..0fac3cdcf5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -254,7 +254,8 @@ private[master] class Master( case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { - val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + + "Can only accept driver submissions in ALIVE state." sender ! SubmitDriverResponse(false, None, msg) } else { logInfo("Driver submitted " + description.command.mainClass) @@ -274,7 +275,8 @@ private[master] class Master( case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { - val msg = s"Can only kill drivers in ALIVE state. Current state: $state." + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + + s"Can only kill drivers in ALIVE state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) @@ -305,12 +307,18 @@ private[master] class Master( } case RequestDriverStatus(driverId) => { - (drivers ++ completedDrivers).find(_.id == driverId) match { - case Some(driver) => - sender ! DriverStatusResponse(found = true, Some(driver.state), - driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) - case None => - sender ! DriverStatusResponse(found = false, None, None, None, None) + if (state != RecoveryState.ALIVE) { + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + + "Can only request driver status in ALIVE state." + sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))) + } else { + (drivers ++ completedDrivers).find(_.id == driverId) match { + case Some(driver) => + sender ! DriverStatusResponse(found = true, Some(driver.state), + driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) + case None => + sender ! DriverStatusResponse(found = false, None, None, None, None) + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 307cebfb4b..6078f50518 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -18,9 +18,10 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, FileNotFoundException} -import java.net.{HttpURLConnection, SocketException, URL} +import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import javax.servlet.http.HttpServletResponse +import scala.collection.mutable import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException @@ -51,57 +52,109 @@ import org.apache.spark.util.Utils * implementation of this client can use that information to retry using the version specified * by the server. */ -private[spark] class RestSubmissionClient extends Logging { +private[spark] class RestSubmissionClient(master: String) extends Logging { import RestSubmissionClient._ private val supportedMasterPrefixes = Seq("spark://", "mesos://") + private val masters: Array[String] = Utils.parseStandaloneMasterUrls(master) + + // Set of masters that lost contact with us, used to keep track of + // whether there are masters still alive for us to communicate with + private val lostMasters = new mutable.HashSet[String] + /** * Submit an application specified by the parameters in the provided request. * * If the submission was successful, poll the status of the submission and report * it to the user. Otherwise, report the error message provided by the server. */ - def createSubmission( - master: String, - request: CreateSubmissionRequest): SubmitRestProtocolResponse = { + def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = { logInfo(s"Submitting a request to launch an application in $master.") - validateMaster(master) - val url = getSubmitUrl(master) - val response = postJson(url, request.toJson) - response match { - case s: CreateSubmissionResponse => - reportSubmissionStatus(master, s) - handleRestResponse(s) - case unexpected => - handleUnexpectedRestResponse(unexpected) + var handled: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters if !handled) { + validateMaster(m) + val url = getSubmitUrl(m) + try { + response = postJson(url, request.toJson) + response match { + case s: CreateSubmissionResponse => + if (s.success) { + reportSubmissionStatus(s) + handleRestResponse(s) + handled = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e: SubmitRestConnectionException => + if (handleConnectionException(m)) { + throw new SubmitRestConnectionException("Unable to connect to server", e) + } + } } response } /** Request that the server kill the specified submission. */ - def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = { + def killSubmission(submissionId: String): SubmitRestProtocolResponse = { logInfo(s"Submitting a request to kill submission $submissionId in $master.") - validateMaster(master) - val response = post(getKillUrl(master, submissionId)) - response match { - case k: KillSubmissionResponse => handleRestResponse(k) - case unexpected => handleUnexpectedRestResponse(unexpected) + var handled: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters if !handled) { + validateMaster(m) + val url = getKillUrl(m, submissionId) + try { + response = post(url) + response match { + case k: KillSubmissionResponse => + if (!Utils.responseFromBackup(k.message)) { + handleRestResponse(k) + handled = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e: SubmitRestConnectionException => + if (handleConnectionException(m)) { + throw new SubmitRestConnectionException("Unable to connect to server", e) + } + } } response } /** Request the status of a submission from the server. */ def requestSubmissionStatus( - master: String, submissionId: String, quiet: Boolean = false): SubmitRestProtocolResponse = { logInfo(s"Submitting a request for the status of submission $submissionId in $master.") - validateMaster(master) - val response = get(getStatusUrl(master, submissionId)) - response match { - case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) } - case unexpected => handleUnexpectedRestResponse(unexpected) + + var handled: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters if !handled) { + validateMaster(m) + val url = getStatusUrl(m, submissionId) + try { + response = get(url) + response match { + case s: SubmissionStatusResponse if s.success => + if (!quiet) { + handleRestResponse(s) + } + handled = true + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e: SubmitRestConnectionException => + if (handleConnectionException(m)) { + throw new SubmitRestConnectionException("Unable to connect to server", e) + } + } } response } @@ -148,11 +201,16 @@ private[spark] class RestSubmissionClient extends Logging { conn.setRequestProperty("Content-Type", "application/json") conn.setRequestProperty("charset", "utf-8") conn.setDoOutput(true) - val out = new DataOutputStream(conn.getOutputStream) - Utils.tryWithSafeFinally { - out.write(json.getBytes(Charsets.UTF_8)) - } { - out.close() + try { + val out = new DataOutputStream(conn.getOutputStream) + Utils.tryWithSafeFinally { + out.write(json.getBytes(Charsets.UTF_8)) + } { + out.close() + } + } catch { + case e: ConnectException => + throw new SubmitRestConnectionException("Connect Exception when connect to server", e) } readResponse(conn) } @@ -191,11 +249,9 @@ private[spark] class RestSubmissionClient extends Logging { } } catch { case unreachable @ (_: FileNotFoundException | _: SocketException) => - throw new SubmitRestConnectionException( - s"Unable to connect to server ${connection.getURL}", unreachable) + throw new SubmitRestConnectionException("Unable to connect to server", unreachable) case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => - throw new SubmitRestProtocolException( - "Malformed response received from server", malformed) + throw new SubmitRestProtocolException("Malformed response received from server", malformed) } } @@ -241,13 +297,12 @@ private[spark] class RestSubmissionClient extends Logging { /** Report the status of a newly created submission. */ private def reportSubmissionStatus( - master: String, submitResponse: CreateSubmissionResponse): Unit = { if (submitResponse.success) { val submissionId = submitResponse.submissionId if (submissionId != null) { logInfo(s"Submission successfully created as $submissionId. Polling submission state...") - pollSubmissionStatus(master, submissionId) + pollSubmissionStatus(submissionId) } else { // should never happen logError("Application successfully submitted, but submission ID was not provided!") @@ -262,9 +317,9 @@ private[spark] class RestSubmissionClient extends Logging { * Poll the status of the specified submission and log it. * This retries up to a fixed number of times before giving up. */ - private def pollSubmissionStatus(master: String, submissionId: String): Unit = { + private def pollSubmissionStatus(submissionId: String): Unit = { (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ => - val response = requestSubmissionStatus(master, submissionId, quiet = true) + val response = requestSubmissionStatus(submissionId, quiet = true) val statusResponse = response match { case s: SubmissionStatusResponse => s case _ => return // unexpected type, let upstream caller handle it @@ -302,6 +357,21 @@ private[spark] class RestSubmissionClient extends Logging { private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = { logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.") } + + /** + * When a connection exception is caught, return true if all masters are lost. + * Note that the heuristic used here does not take into account that masters + * can recover during the lifetime of this client. This assumption should be + * harmless because this client currently does not support retrying submission + * on failure yet (SPARK-6443). + */ + private def handleConnectionException(masterUrl: String): Boolean = { + if (!lostMasters.contains(masterUrl)) { + logWarning(s"Unable to connect to server ${masterUrl}.") + lostMasters += masterUrl + } + lostMasters.size >= masters.size + } } private[spark] object RestSubmissionClient { @@ -324,10 +394,10 @@ private[spark] object RestSubmissionClient { } val sparkProperties = conf.getAll.toMap val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") } - val client = new RestSubmissionClient + val client = new RestSubmissionClient(master) val submitRequest = client.constructSubmitRequest( appResource, mainClass, appArgs, sparkProperties, environmentVariables) - client.createSubmission(master, submitRequest) + client.createSubmission(submitRequest) } def main(args: Array[String]): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 88f9d880ac..9678631da9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - masters = value.stripPrefix("spark://").split(",").map("spark://" + _) + masters = Utils.parseStandaloneMasterUrls(value) parse(tail) case Nil => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 844f0cd22d..be4db02ab8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2159,6 +2159,22 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } + /** + * Split the comma delimited string of master URLs into a list. + * For instance, "spark://abc,def" becomes [spark://abc, spark://def]. + */ + def parseStandaloneMasterUrls(masterUrls: String): Array[String] = { + masterUrls.stripPrefix("spark://").split(",").map("spark://" + _) + } + + /** An identifier that backup masters use in their responses. */ + val BACKUP_STANDALONE_MASTER_PREFIX = "Current state is not alive" + + /** Return true if the response message is sent from a backup Master on standby. */ + def responseFromBackup(msg: String): Boolean = { + msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX) + } + /** * Adds a shutdown hook with default priority. * diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 0a318a27ac..f4d548d9e7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._ * Tests for the REST application submission protocol used in standalone cluster mode. */ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { - private val client = new RestSubmissionClient private var actorSystem: Option[ActorSystem] = None private var server: Option[RestSubmissionServer] = None @@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val appArgs = Array("one", "two", "three") val sparkProperties = Map("spark.app.name" -> "pi") val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX") - val request = client.constructSubmitRequest( + val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest( "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables) assert(request.action === Utils.getFormattedClassName(request)) assert(request.clientSparkVersion === SPARK_VERSION) @@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val request = constructSubmitRequest(masterUrl, appArgs) assert(request.appArgs === appArgs) assert(request.sparkProperties("spark.master") === masterUrl) - val response = client.createSubmission(masterUrl, request) + val response = new RestSubmissionClient(masterUrl).createSubmission(request) val submitResponse = getSubmitResponse(response) assert(submitResponse.action === Utils.getFormattedClassName(submitResponse)) assert(submitResponse.serverSparkVersion === SPARK_VERSION) @@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId = "my-lyft-driver" val killMessage = "your driver is killed" val masterUrl = startDummyServer(killMessage = killMessage) - val response = client.killSubmission(masterUrl, submissionId) + val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId) val killResponse = getKillResponse(response) assert(killResponse.action === Utils.getFormattedClassName(killResponse)) assert(killResponse.serverSparkVersion === SPARK_VERSION) @@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionState = KILLED val submissionException = new Exception("there was an irresponsible mix of alcohol and cars") val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException)) - val response = client.requestSubmissionStatus(masterUrl, submissionId) + val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response) assert(statusResponse.action === Utils.getFormattedClassName(statusResponse)) assert(statusResponse.serverSparkVersion === SPARK_VERSION) @@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) + val client = new RestSubmissionClient(masterUrl) + val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) assert(submitResponse.submissionId != null) // kill submission that was just created val submissionId = submitResponse.submissionId - val response2 = client.killSubmission(masterUrl, submissionId) + val response2 = client.killSubmission(submissionId) val killResponse = getKillResponse(response2) assert(killResponse.success) assert(killResponse.submissionId === submissionId) @@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) + val client = new RestSubmissionClient(masterUrl) + val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) assert(submitResponse.submissionId != null) // request status of submission that was just created val submissionId = submitResponse.submissionId - val response2 = client.requestSubmissionStatus(masterUrl, submissionId) + val response2 = client.requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response2) assert(statusResponse.success) assert(statusResponse.submissionId === submissionId) @@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) - val response2 = client.createSubmission(masterUrl, request) + val client = new RestSubmissionClient(masterUrl) + val response1 = client.createSubmission(request) + val response2 = client.createSubmission(request) val submitResponse1 = getSubmitResponse(response1) val submitResponse2 = getSubmitResponse(response2) assert(submitResponse1.success) @@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId1 = submitResponse1.submissionId val submissionId2 = submitResponse2.submissionId // kill only submission 1, but not submission 2 - val response3 = client.killSubmission(masterUrl, submissionId1) + val response3 = client.killSubmission(submissionId1) val killResponse = getKillResponse(response3) assert(killResponse.success) assert(killResponse.submissionId === submissionId1) // request status for both submissions: 1 should be KILLED but 2 should be RUNNING still - val response4 = client.requestSubmissionStatus(masterUrl, submissionId1) - val response5 = client.requestSubmissionStatus(masterUrl, submissionId2) + val response4 = client.requestSubmissionStatus(submissionId1) + val response5 = client.requestSubmissionStatus(submissionId2) val statusResponse1 = getStatusResponse(response4) val statusResponse2 = getStatusResponse(response5) assert(statusResponse1.submissionId === submissionId1) @@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("kill or request status before create") { val masterUrl = startSmartServer() val doesNotExist = "does-not-exist" + val client = new RestSubmissionClient(masterUrl) // kill a non-existent submission - val response1 = client.killSubmission(masterUrl, doesNotExist) + val response1 = client.killSubmission(doesNotExist) val killResponse = getKillResponse(response1) assert(!killResponse.success) assert(killResponse.submissionId === doesNotExist) // request status for a non-existent submission - val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist) + val response2 = client.requestSubmissionStatus(doesNotExist) val statusResponse = getStatusResponse(response2) assert(!statusResponse.success) assert(statusResponse.submissionId === doesNotExist) @@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("client handles faulty server") { val masterUrl = startFaultyServer() + val client = new RestSubmissionClient(masterUrl) val httpUrl = masterUrl.replace("spark://", "http://") val v = RestSubmissionServer.PROTOCOL_VERSION val submitRequestPath = s"$httpUrl/$v/submissions/create" @@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { mainJar) ++ appArgs val args = new SparkSubmitArguments(commandLineArgs) val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) - client.constructSubmitRequest( + new RestSubmissionClient("spark://host:port").constructSubmitRequest( mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) } @@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { method: String, body: String = ""): (SubmitRestProtocolResponse, Int) = { val conn = sendHttpRequest(url, method, body) - (client.readResponse(conn), conn.getResponseCode) + (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode) } } -- cgit v1.2.3