diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-08-02 00:45:38 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-08-02 00:45:38 -0700 |
commit | 148af6082cdb44840bbd61c7a4f67a95badad10b (patch) | |
tree | 8acbf61d0c81122c9d6fb3b18940f5b4047f6689 /core | |
parent | d934801d53fc2f1d57d3534ae4e1e9384c7dda99 (diff) | |
download | spark-148af6082cdb44840bbd61c7a4f67a95badad10b.tar.gz spark-148af6082cdb44840bbd61c7a4f67a95badad10b.tar.bz2 spark-148af6082cdb44840bbd61c7a4f67a95badad10b.zip |
[SPARK-2454] Do not ship spark home to Workers
When standalone Workers launch executors, they inherit the Spark home set by the driver. This means if the worker machines do not share the same directory structure as the driver node, the Workers will attempt to run scripts (e.g. bin/compute-classpath.sh) that do not exist locally and fail. This is a common scenario if the driver is launched from outside of the cluster.
The solution is to simply not pass the driver's Spark home to the Workers. This PR further makes an attempt to avoid overloading the usages of `spark.home`, which is now only used for setting executor Spark home on Mesos and in python.
This is based on top of #1392 and originally reported by YanTangZhai. Tested on standalone cluster.
Author: Andrew Or <andrewor14@gmail.com>
Closes #1734 from andrewor14/spark-home-reprise and squashes the following commits:
f71f391 [Andrew Or] Revert changes in python
1c2532c [Andrew Or] Merge branch 'master' of github.com:apache/spark into spark-home-reprise
188fc5d [Andrew Or] Avoid using spark.home where possible
09272b7 [Andrew Or] Always use Worker's working directory as spark home
Diffstat (limited to 'core')
9 files changed, 13 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8..65a1a8fd7e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,7 +22,6 @@ private[spark] class ApplicationDescription( val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, - val sparkHome: Option[String], var appUiUrl: String, val eventLogDir: Option[String] = None) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index c4f5e294a3..696f32a6f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -56,7 +56,6 @@ private[spark] object JsonProtocol { ("cores" -> obj.maxCores) ~ ("memoryperslave" -> obj.memoryPerSlave) ~ ("user" -> obj.user) ~ - ("sparkhome" -> obj.sparkHome) ~ ("command" -> obj.command.toString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index b8ffa9afb6..88a0862b96 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -48,9 +48,8 @@ private[spark] object TestClient { val conf = new SparkConf val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) - val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), - Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored") + val desc = new ApplicationDescription("TestClient", Some(1), 512, + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index fb5252da96..c6ea42fceb 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -81,7 +81,8 @@ private[spark] class Worker( @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() - val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) + val sparkHome = + new File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse(".")) var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] @@ -233,9 +234,7 @@ private[spark] class Worker( try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, - appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), - workDir, akkaUrl, conf, ExecutorState.RUNNING) + self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 48aaaa54bd..a28446f6c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend( val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) - val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index de4bd90c8f..e36902ec81 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -34,7 +34,7 @@ import scala.language.postfixOps class DriverSuite extends FunSuite with Timeouts { test("driver should exit after finishing") { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.props("spark.test.home") // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 093394ad6d..31aa7ec837 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl") } def createAppInfo() : ApplicationInfo = { @@ -169,8 +169,7 @@ object JsonConstants { val appDescJsonStr = """ |{"name":"name","cores":4,"memoryperslave":1234, - |"user":"%s","sparkhome":"sparkHome", - |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} + |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} """.format(System.getProperty("user.name", "<unknown>")).stripMargin val executorRunnerJsonStr = diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 9190b05e2d..8126ef1bb2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String]): String = { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.props("spark.test.home") Utils.executeAndGetOutput( Seq("./bin/spark-submit") ++ args, new File(sparkHome), diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index ca4d987619..149a2b3d95 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -27,12 +27,11 @@ import org.apache.spark.SparkConf class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) + val sparkHome = sys.props("spark.test.home") val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), - sparkHome, "appUiUrl") + Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl") val appId = "12345-worker321-9876" - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId) |