aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-02 00:45:38 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-02 00:45:38 -0700
commit148af6082cdb44840bbd61c7a4f67a95badad10b (patch)
tree8acbf61d0c81122c9d6fb3b18940f5b4047f6689 /core
parentd934801d53fc2f1d57d3534ae4e1e9384c7dda99 (diff)
downloadspark-148af6082cdb44840bbd61c7a4f67a95badad10b.tar.gz
spark-148af6082cdb44840bbd61c7a4f67a95badad10b.tar.bz2
spark-148af6082cdb44840bbd61c7a4f67a95badad10b.zip
[SPARK-2454] Do not ship spark home to Workers
When standalone Workers launch executors, they inherit the Spark home set by the driver. This means if the worker machines do not share the same directory structure as the driver node, the Workers will attempt to run scripts (e.g. bin/compute-classpath.sh) that do not exist locally and fail. This is a common scenario if the driver is launched from outside of the cluster. The solution is to simply not pass the driver's Spark home to the Workers. This PR further makes an attempt to avoid overloading the usages of `spark.home`, which is now only used for setting executor Spark home on Mesos and in python. This is based on top of #1392 and originally reported by YanTangZhai. Tested on standalone cluster. Author: Andrew Or <andrewor14@gmail.com> Closes #1734 from andrewor14/spark-home-reprise and squashes the following commits: f71f391 [Andrew Or] Revert changes in python 1c2532c [Andrew Or] Merge branch 'master' of github.com:apache/spark into spark-home-reprise 188fc5d [Andrew Or] Avoid using spark.home where possible 09272b7 [Andrew Or] Always use Worker's working directory as spark home
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala5
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala7
9 files changed, 13 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 86305d2ea8..65a1a8fd7e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,7 +22,6 @@ private[spark] class ApplicationDescription(
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
- val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index c4f5e294a3..696f32a6f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -56,7 +56,6 @@ private[spark] object JsonProtocol {
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
("user" -> obj.user) ~
- ("sparkhome" -> obj.sparkHome) ~
("command" -> obj.command.toString)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index b8ffa9afb6..88a0862b96 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -48,9 +48,8 @@ private[spark] object TestClient {
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
- val desc = new ApplicationDescription(
- "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
- Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
+ val desc = new ApplicationDescription("TestClient", Some(1), 512,
+ Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index fb5252da96..c6ea42fceb 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -81,7 +81,8 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
- val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
+ val sparkHome =
+ new File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse("."))
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -233,9 +234,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host,
- appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
- workDir, akkaUrl, conf, ExecutorState.RUNNING)
+ self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 48aaaa54bd..a28446f6c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
- val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
+ sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index de4bd90c8f..e36902ec81 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ import scala.language.postfixOps
class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") {
- val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ val sparkHome = sys.props("spark.test.home")
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 093394ad6d..31aa7ec837 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {
def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
- new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
+ new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
@@ -169,8 +169,7 @@ object JsonConstants {
val appDescJsonStr =
"""
|{"name":"name","cores":4,"memoryperslave":1234,
- |"user":"%s","sparkhome":"sparkHome",
- |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
+ |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin
val executorRunnerJsonStr =
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 9190b05e2d..8126ef1bb2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
- val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ val sparkHome = sys.props("spark.test.home")
Utils.executeAndGetOutput(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index ca4d987619..149a2b3d95 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -27,12 +27,11 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
- val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
+ val sparkHome = sys.props("spark.test.home")
val appDesc = new ApplicationDescription("app name", Some(8), 500,
- Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
- sparkHome, "appUiUrl")
+ Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
- val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
+ val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)