diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-01-15 19:32:50 -0500 |
---|---|---|
committer | CodingCat <zhunansjtu@gmail.com> | 2014-01-20 02:50:30 -0500 |
commit | f9a95d67365509cdd260858e858e7a9b120c1d1b (patch) | |
tree | 37592dd0681724d9bfa48312c2928ea4f6993255 | |
parent | 792d9084e2bc9f778a00a56fa7dcfe4084153aea (diff) | |
download | spark-f9a95d67365509cdd260858e858e7a9b120c1d1b.tar.gz spark-f9a95d67365509cdd260858e858e7a9b120c1d1b.tar.bz2 spark-f9a95d67365509cdd260858e858e7a9b120c1d1b.zip |
executor creation failed should not make the worker restart
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 312560d706..c9e4fc2682 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -208,18 +208,26 @@ private[spark] class Worker( if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - // TODO (pwendell): We shuld make sparkHome an Option[String] in - // ApplicationDescription to be more explicit about this. - val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + try { + logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + // TODO (pwendell): We shuld make sparkHome an Option[String] in + // ApplicationDescription to be more explicit about this. + val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, + self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) + executors(appId + "/" + execId) = manager + manager.start() + coresUsed += cores_ + memoryUsed += memory_ + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } + } catch { + case e: Exception => { + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) + } + } } } |