diff options
author | WangTaoTheTonic <wangtao111@huawei.com> | 2015-05-01 18:38:20 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-01 18:38:20 -0700 |
commit | b4b43df8a338a30c0eadcf10cbe3ba203dc3f861 (patch) | |
tree | 1cce6e72d9c907398ef6ffb89986ae03c8ba729b /core/src/test | |
parent | 2022193412e832393a29b94609841c3ffe8e3d66 (diff) | |
download | spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.tar.gz spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.tar.bz2 spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.zip |
[SPARK-6443] [SPARK SUBMIT] Could not submit app in standalone cluster mode when HA is enabled
**3/26 update:**
* Akka-based:
Use an array of `ActorSelection` to represent multiple master. Add an `activeMasterActor` for query status of driver. And will add lost masters( including the standby one) to `lostMasters`.
When size of `lostMasters` equals or greater than # of all masters, we should give an error that all masters are not avalible.
* Rest-based:
When all masters are not available(throw an exception), we use akka gateway to submit apps.
I have tested simply on standalone HA cluster(with two masters alive and one alive/one dead), it worked.
There might remains some issues on style or message print, but we can check the solution then fix them together.
/cc srowen andrewor14
Author: WangTaoTheTonic <wangtao111@huawei.com>
Closes #5116 from WangTaoTheTonic/SPARK-6443 and squashes the following commits:
2a28aab [WangTaoTheTonic] based the newest change https://github.com/apache/spark/pull/5144
76fd411 [WangTaoTheTonic] rebase
f4f972b [WangTaoTheTonic] rebase...again
a41de0b [WangTaoTheTonic] rebase
220cb3c [WangTaoTheTonic] move connect exception inside
35119a0 [WangTaoTheTonic] style and compile issues
9d636be [WangTaoTheTonic] per Andrew's comments
979760c [WangTaoTheTonic] rebase
e4f4ece [WangTaoTheTonic] fix failed test
5d23958 [WangTaoTheTonic] refact some duplicated code, style and comments
7a881b3 [WangTaoTheTonic] when one of masters is gone, we still can submit
2b011c9 [WangTaoTheTonic] fix broken tests
60d97a4 [WangTaoTheTonic] rebase
fa1fa80 [WangTaoTheTonic] submit app to HA cluster in standalone cluster mode
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 0a318a27ac..f4d548d9e7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._ * Tests for the REST application submission protocol used in standalone cluster mode. */ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { - private val client = new RestSubmissionClient private var actorSystem: Option[ActorSystem] = None private var server: Option[RestSubmissionServer] = None @@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val appArgs = Array("one", "two", "three") val sparkProperties = Map("spark.app.name" -> "pi") val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX") - val request = client.constructSubmitRequest( + val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest( "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables) assert(request.action === Utils.getFormattedClassName(request)) assert(request.clientSparkVersion === SPARK_VERSION) @@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val request = constructSubmitRequest(masterUrl, appArgs) assert(request.appArgs === appArgs) assert(request.sparkProperties("spark.master") === masterUrl) - val response = client.createSubmission(masterUrl, request) + val response = new RestSubmissionClient(masterUrl).createSubmission(request) val submitResponse = getSubmitResponse(response) assert(submitResponse.action === Utils.getFormattedClassName(submitResponse)) assert(submitResponse.serverSparkVersion === SPARK_VERSION) @@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId = "my-lyft-driver" val killMessage = "your driver is killed" val masterUrl = startDummyServer(killMessage = killMessage) - val response = client.killSubmission(masterUrl, submissionId) + val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId) val killResponse = getKillResponse(response) assert(killResponse.action === Utils.getFormattedClassName(killResponse)) assert(killResponse.serverSparkVersion === SPARK_VERSION) @@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionState = KILLED val submissionException = new Exception("there was an irresponsible mix of alcohol and cars") val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException)) - val response = client.requestSubmissionStatus(masterUrl, submissionId) + val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response) assert(statusResponse.action === Utils.getFormattedClassName(statusResponse)) assert(statusResponse.serverSparkVersion === SPARK_VERSION) @@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) + val client = new RestSubmissionClient(masterUrl) + val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) assert(submitResponse.submissionId != null) // kill submission that was just created val submissionId = submitResponse.submissionId - val response2 = client.killSubmission(masterUrl, submissionId) + val response2 = client.killSubmission(submissionId) val killResponse = getKillResponse(response2) assert(killResponse.success) assert(killResponse.submissionId === submissionId) @@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) + val client = new RestSubmissionClient(masterUrl) + val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) assert(submitResponse.submissionId != null) // request status of submission that was just created val submissionId = submitResponse.submissionId - val response2 = client.requestSubmissionStatus(masterUrl, submissionId) + val response2 = client.requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response2) assert(statusResponse.success) assert(statusResponse.submissionId === submissionId) @@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) - val response2 = client.createSubmission(masterUrl, request) + val client = new RestSubmissionClient(masterUrl) + val response1 = client.createSubmission(request) + val response2 = client.createSubmission(request) val submitResponse1 = getSubmitResponse(response1) val submitResponse2 = getSubmitResponse(response2) assert(submitResponse1.success) @@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId1 = submitResponse1.submissionId val submissionId2 = submitResponse2.submissionId // kill only submission 1, but not submission 2 - val response3 = client.killSubmission(masterUrl, submissionId1) + val response3 = client.killSubmission(submissionId1) val killResponse = getKillResponse(response3) assert(killResponse.success) assert(killResponse.submissionId === submissionId1) // request status for both submissions: 1 should be KILLED but 2 should be RUNNING still - val response4 = client.requestSubmissionStatus(masterUrl, submissionId1) - val response5 = client.requestSubmissionStatus(masterUrl, submissionId2) + val response4 = client.requestSubmissionStatus(submissionId1) + val response5 = client.requestSubmissionStatus(submissionId2) val statusResponse1 = getStatusResponse(response4) val statusResponse2 = getStatusResponse(response5) assert(statusResponse1.submissionId === submissionId1) @@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("kill or request status before create") { val masterUrl = startSmartServer() val doesNotExist = "does-not-exist" + val client = new RestSubmissionClient(masterUrl) // kill a non-existent submission - val response1 = client.killSubmission(masterUrl, doesNotExist) + val response1 = client.killSubmission(doesNotExist) val killResponse = getKillResponse(response1) assert(!killResponse.success) assert(killResponse.submissionId === doesNotExist) // request status for a non-existent submission - val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist) + val response2 = client.requestSubmissionStatus(doesNotExist) val statusResponse = getStatusResponse(response2) assert(!statusResponse.success) assert(statusResponse.submissionId === doesNotExist) @@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("client handles faulty server") { val masterUrl = startFaultyServer() + val client = new RestSubmissionClient(masterUrl) val httpUrl = masterUrl.replace("spark://", "http://") val v = RestSubmissionServer.PROTOCOL_VERSION val submitRequestPath = s"$httpUrl/$v/submissions/create" @@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { mainJar) ++ appArgs val args = new SparkSubmitArguments(commandLineArgs) val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) - client.constructSubmitRequest( + new RestSubmissionClient("spark://host:port").constructSubmitRequest( mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) } @@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { method: String, body: String = ""): (SubmitRestProtocolResponse, Int) = { val conn = sendHttpRequest(url, method, body) - (client.readResponse(conn), conn.getResponseCode) + (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode) } } |