aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-10-13 23:31:37 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-13 23:31:37 -0700
commit186b497c945cc7bbe7a21fef56a948dd1fd10774 (patch)
tree0ae5fd469a8b882e52b77a1348d6484d62c3a61e
parent4d26aca770f7dd50eee1ed7855e9eda68b5a7ffa (diff)
downloadspark-186b497c945cc7bbe7a21fef56a948dd1fd10774.tar.gz
spark-186b497c945cc7bbe7a21fef56a948dd1fd10774.tar.bz2
spark-186b497c945cc7bbe7a21fef56a948dd1fd10774.zip
[SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode
The goal of this patch is to fix the swapped arguments in standalone mode, which was caused by https://github.com/apache/spark/commit/79e45c9323455a51f25ed9acd0edd8682b4bbb88#diff-79391110e9f26657e415aa169a004998R153. More details can be found in the JIRA: [SPARK-3921](https://issues.apache.org/jira/browse/SPARK-3921) Tested in Standalone mode, but not in Mesos. Author: Aaron Davidson <aaron@databricks.com> Closes #2779 from aarondav/fix-standalone and squashes the following commits: 725227a [Aaron Davidson] Fix ExecutorRunnerTest 9d703fe [Aaron Davidson] [SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala10
5 files changed, 15 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 71650cd773..71d7385b08 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -111,13 +111,14 @@ private[spark] class ExecutorRunner(
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
+ case "{{APP_ID}}" => appId
case other => other
}
def getCommandSeq = {
val command = Command(
appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+ appDesc.command.arguments.map(substituteVariables),
appDesc.command.environment,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 06061edfc0..c40a3e1667 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> <appid> [<workerUrl>] ")
System.exit(1)
+
+ // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
+ // and CoarseMesosSchedulerBackend (for mesos mode).
case 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
case x if x > 5 =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index ed209d195e..8c7de75600 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
+ "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 90828578cd..d7f88de4b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
.format(basename, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores))
+ offer.getHostname, numCores, appId))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 39ab53cf0b..5e2592e8d2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,14 +26,12 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
- def f(s:String) = new File(s)
+ val appId = "12345-worker321-9876"
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
- Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
- val appId = "12345-worker321-9876"
- val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
- f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
-
+ Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
+ val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
+ new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
}
}