aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-01-15 20:46:14 -0500
committerCodingCat <zhunansjtu@gmail.com>2014-01-20 02:50:30 -0500
commit29f4b6a2d9f42a727691444312964e59ef9b95ee (patch)
tree325613a90c29937776d91708a2d649469c4025c0 /core
parentf9a95d67365509cdd260858e858e7a9b120c1d1b (diff)
downloadspark-29f4b6a2d9f42a727691444312964e59ef9b95ee.tar.gz
spark-29f4b6a2d9f42a727691444312964e59ef9b95ee.tar.bz2
spark-29f4b6a2d9f42a727691444312964e59ef9b95ee.zip
fix for SPARK-1027
change TestClient & Worker to Some("xxx") kill manager if it is started remove unnecessary .get when fetch "SPARK_HOME" values
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala4
8 files changed, 18 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index e38459b883..449b953530 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,7 +22,7 @@ private[spark] class ApplicationDescription(
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
- val sparkHome: String,
+ val sparkHome: Option[String],
val appUiUrl: String)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 5e824e1a67..83ce14a0a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -79,8 +79,7 @@ private[deploy] object DeployMessages {
execId: Int,
appDesc: ApplicationDescription,
cores: Int,
- memory: Int,
- sparkHome: String)
+ memory: Int)
extends DeployMessage
case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ffa909c26b..8017932032 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,7 +49,7 @@ private[spark] object TestClient {
conf = new SparkConf)
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
- "dummy-spark-home", "ignored")
+ Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d9ea96afcf..fe9770cec2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -480,7 +480,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
- launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
+ launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
@@ -493,7 +493,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec, app.desc.sparkHome)
+ launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
@@ -502,11 +502,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
- def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
- exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+ exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c9e4fc2682..de45da2046 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -204,17 +204,15 @@ private[spark] class Worker(
System.exit(1)
}
- case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+ case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- // TODO (pwendell): We shuld make sparkHome an Option[String] in
- // ApplicationDescription to be more explicit about this.
- val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
+ self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)),
+ workDir, akkaUrl, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -224,6 +222,10 @@ private[spark] class Worker(
}
} catch {
case e: Exception => {
+ logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
+ if (executors.contains(appId + "/" + execId)) {
+ executors(appId + "/" + execId).kill()
+ }
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index faa6e1ebe8..33aac52051 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
- val sparkHome = sc.getSparkHome().getOrElse(null)
+ val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index d05bbd6ff7..693b1ab237 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -74,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
- new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
+ new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index a79ee690d3..4baa65659f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,11 +26,11 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
- val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
- val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
+ val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
f("ooga"), "blah", ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)