diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-01-15 20:46:14 -0500 |
---|---|---|
committer | CodingCat <zhunansjtu@gmail.com> | 2014-01-20 02:50:30 -0500 |
commit | 29f4b6a2d9f42a727691444312964e59ef9b95ee (patch) | |
tree | 325613a90c29937776d91708a2d649469c4025c0 | |
parent | f9a95d67365509cdd260858e858e7a9b120c1d1b (diff) | |
download | spark-29f4b6a2d9f42a727691444312964e59ef9b95ee.tar.gz spark-29f4b6a2d9f42a727691444312964e59ef9b95ee.tar.bz2 spark-29f4b6a2d9f42a727691444312964e59ef9b95ee.zip |
fix for SPARK-1027
change TestClient & Worker to Some("xxx")
kill manager if it is started
remove unnecessary .get when fetch "SPARK_HOME" values
8 files changed, 18 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index e38459b883..449b953530 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,7 +22,7 @@ private[spark] class ApplicationDescription( val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, - val sparkHome: String, + val sparkHome: Option[String], val appUiUrl: String) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 5e824e1a67..83ce14a0a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -79,8 +79,7 @@ private[deploy] object DeployMessages { execId: Int, appDesc: ApplicationDescription, cores: Int, - memory: Int, - sparkHome: String) + memory: Int) extends DeployMessage case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index ffa909c26b..8017932032 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,7 +49,7 @@ private[spark] object TestClient { conf = new SparkConf) val desc = new ApplicationDescription( "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), - "dummy-spark-home", "ignored") + Some("dummy-spark-home"), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d9ea96afcf..fe9770cec2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -480,7 +480,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) - launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome) + launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING } } @@ -493,7 +493,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) - launchExecutor(worker, exec, app.desc.sparkHome) + launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } @@ -502,11 +502,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { + def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, - exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) + exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c9e4fc2682..de45da2046 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -204,17 +204,15 @@ private[spark] class Worker( System.exit(1) } - case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - // TODO (pwendell): We shuld make sparkHome an Option[String] in - // ApplicationDescription to be more explicit about this. - val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) + self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)), + workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -224,6 +222,10 @@ private[spark] class Worker( } } catch { case e: Exception => { + logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name)) + if (executors.contains(appId + "/" + execId)) { + executors(appId + "/" + execId).kill() + } masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index faa6e1ebe8..33aac52051 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend( val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) - val sparkHome = sc.getSparkHome().getOrElse(null) + val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index d05bbd6ff7..693b1ab237 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -74,7 +74,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) - new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") } def createAppInfo() : ApplicationInfo = { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index a79ee690d3..4baa65659f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -26,11 +26,11 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription} class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), f("ooga"), "blah", ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId) |