aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorWangTaoTheTonic <wangtao111@huawei.com>2015-05-01 18:38:20 -0700
committerAndrew Or <andrew@databricks.com>2015-05-01 18:38:20 -0700
commitb4b43df8a338a30c0eadcf10cbe3ba203dc3f861 (patch)
tree1cce6e72d9c907398ef6ffb89986ae03c8ba729b /core/src/test
parent2022193412e832393a29b94609841c3ffe8e3d66 (diff)
downloadspark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.tar.gz
spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.tar.bz2
spark-b4b43df8a338a30c0eadcf10cbe3ba203dc3f861.zip
[SPARK-6443] [SPARK SUBMIT] Could not submit app in standalone cluster mode when HA is enabled
**3/26 update:** * Akka-based: Use an array of `ActorSelection` to represent multiple master. Add an `activeMasterActor` for query status of driver. And will add lost masters( including the standby one) to `lostMasters`. When size of `lostMasters` equals or greater than # of all masters, we should give an error that all masters are not avalible. * Rest-based: When all masters are not available(throw an exception), we use akka gateway to submit apps. I have tested simply on standalone HA cluster(with two masters alive and one alive/one dead), it worked. There might remains some issues on style or message print, but we can check the solution then fix them together. /cc srowen andrewor14 Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #5116 from WangTaoTheTonic/SPARK-6443 and squashes the following commits: 2a28aab [WangTaoTheTonic] based the newest change https://github.com/apache/spark/pull/5144 76fd411 [WangTaoTheTonic] rebase f4f972b [WangTaoTheTonic] rebase...again a41de0b [WangTaoTheTonic] rebase 220cb3c [WangTaoTheTonic] move connect exception inside 35119a0 [WangTaoTheTonic] style and compile issues 9d636be [WangTaoTheTonic] per Andrew's comments 979760c [WangTaoTheTonic] rebase e4f4ece [WangTaoTheTonic] fix failed test 5d23958 [WangTaoTheTonic] refact some duplicated code, style and comments 7a881b3 [WangTaoTheTonic] when one of masters is gone, we still can submit 2b011c9 [WangTaoTheTonic] fix broken tests 60d97a4 [WangTaoTheTonic] rebase fa1fa80 [WangTaoTheTonic] submit app to HA cluster in standalone cluster mode
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala40
1 files changed, 22 insertions, 18 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 0a318a27ac..f4d548d9e7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._
* Tests for the REST application submission protocol used in standalone cluster mode.
*/
class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
- private val client = new RestSubmissionClient
private var actorSystem: Option[ActorSystem] = None
private var server: Option[RestSubmissionServer] = None
@@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val appArgs = Array("one", "two", "three")
val sparkProperties = Map("spark.app.name" -> "pi")
val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX")
- val request = client.constructSubmitRequest(
+ val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest(
"my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables)
assert(request.action === Utils.getFormattedClassName(request))
assert(request.clientSparkVersion === SPARK_VERSION)
@@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val request = constructSubmitRequest(masterUrl, appArgs)
assert(request.appArgs === appArgs)
assert(request.sparkProperties("spark.master") === masterUrl)
- val response = client.createSubmission(masterUrl, request)
+ val response = new RestSubmissionClient(masterUrl).createSubmission(request)
val submitResponse = getSubmitResponse(response)
assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val submissionId = "my-lyft-driver"
val killMessage = "your driver is killed"
val masterUrl = startDummyServer(killMessage = killMessage)
- val response = client.killSubmission(masterUrl, submissionId)
+ val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId)
val killResponse = getKillResponse(response)
assert(killResponse.action === Utils.getFormattedClassName(killResponse))
assert(killResponse.serverSparkVersion === SPARK_VERSION)
@@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val submissionState = KILLED
val submissionException = new Exception("there was an irresponsible mix of alcohol and cars")
val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException))
- val response = client.requestSubmissionStatus(masterUrl, submissionId)
+ val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId)
val statusResponse = getStatusResponse(response)
assert(statusResponse.action === Utils.getFormattedClassName(statusResponse))
assert(statusResponse.serverSparkVersion === SPARK_VERSION)
@@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("create then kill") {
val masterUrl = startSmartServer()
val request = constructSubmitRequest(masterUrl)
- val response1 = client.createSubmission(masterUrl, request)
+ val client = new RestSubmissionClient(masterUrl)
+ val response1 = client.createSubmission(request)
val submitResponse = getSubmitResponse(response1)
assert(submitResponse.success)
assert(submitResponse.submissionId != null)
// kill submission that was just created
val submissionId = submitResponse.submissionId
- val response2 = client.killSubmission(masterUrl, submissionId)
+ val response2 = client.killSubmission(submissionId)
val killResponse = getKillResponse(response2)
assert(killResponse.success)
assert(killResponse.submissionId === submissionId)
@@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("create then request status") {
val masterUrl = startSmartServer()
val request = constructSubmitRequest(masterUrl)
- val response1 = client.createSubmission(masterUrl, request)
+ val client = new RestSubmissionClient(masterUrl)
+ val response1 = client.createSubmission(request)
val submitResponse = getSubmitResponse(response1)
assert(submitResponse.success)
assert(submitResponse.submissionId != null)
// request status of submission that was just created
val submissionId = submitResponse.submissionId
- val response2 = client.requestSubmissionStatus(masterUrl, submissionId)
+ val response2 = client.requestSubmissionStatus(submissionId)
val statusResponse = getStatusResponse(response2)
assert(statusResponse.success)
assert(statusResponse.submissionId === submissionId)
@@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("create then kill then request status") {
val masterUrl = startSmartServer()
val request = constructSubmitRequest(masterUrl)
- val response1 = client.createSubmission(masterUrl, request)
- val response2 = client.createSubmission(masterUrl, request)
+ val client = new RestSubmissionClient(masterUrl)
+ val response1 = client.createSubmission(request)
+ val response2 = client.createSubmission(request)
val submitResponse1 = getSubmitResponse(response1)
val submitResponse2 = getSubmitResponse(response2)
assert(submitResponse1.success)
@@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val submissionId1 = submitResponse1.submissionId
val submissionId2 = submitResponse2.submissionId
// kill only submission 1, but not submission 2
- val response3 = client.killSubmission(masterUrl, submissionId1)
+ val response3 = client.killSubmission(submissionId1)
val killResponse = getKillResponse(response3)
assert(killResponse.success)
assert(killResponse.submissionId === submissionId1)
// request status for both submissions: 1 should be KILLED but 2 should be RUNNING still
- val response4 = client.requestSubmissionStatus(masterUrl, submissionId1)
- val response5 = client.requestSubmissionStatus(masterUrl, submissionId2)
+ val response4 = client.requestSubmissionStatus(submissionId1)
+ val response5 = client.requestSubmissionStatus(submissionId2)
val statusResponse1 = getStatusResponse(response4)
val statusResponse2 = getStatusResponse(response5)
assert(statusResponse1.submissionId === submissionId1)
@@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("kill or request status before create") {
val masterUrl = startSmartServer()
val doesNotExist = "does-not-exist"
+ val client = new RestSubmissionClient(masterUrl)
// kill a non-existent submission
- val response1 = client.killSubmission(masterUrl, doesNotExist)
+ val response1 = client.killSubmission(doesNotExist)
val killResponse = getKillResponse(response1)
assert(!killResponse.success)
assert(killResponse.submissionId === doesNotExist)
// request status for a non-existent submission
- val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist)
+ val response2 = client.requestSubmissionStatus(doesNotExist)
val statusResponse = getStatusResponse(response2)
assert(!statusResponse.success)
assert(statusResponse.submissionId === doesNotExist)
@@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("client handles faulty server") {
val masterUrl = startFaultyServer()
+ val client = new RestSubmissionClient(masterUrl)
val httpUrl = masterUrl.replace("spark://", "http://")
val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
@@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
mainJar) ++ appArgs
val args = new SparkSubmitArguments(commandLineArgs)
val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
- client.constructSubmitRequest(
+ new RestSubmissionClient("spark://host:port").constructSubmitRequest(
mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty)
}
@@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
method: String,
body: String = ""): (SubmitRestProtocolResponse, Int) = {
val conn = sendHttpRequest(url, method, body)
- (client.readResponse(conn), conn.getResponseCode)
+ (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode)
}
}