aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-01-15 19:32:50 -0500
committerCodingCat <zhunansjtu@gmail.com>2014-01-20 02:50:30 -0500
commitf9a95d67365509cdd260858e858e7a9b120c1d1b (patch)
tree37592dd0681724d9bfa48312c2928ea4f6993255
parent792d9084e2bc9f778a00a56fa7dcfe4084153aea (diff)
downloadspark-f9a95d67365509cdd260858e858e7a9b120c1d1b.tar.gz
spark-f9a95d67365509cdd260858e858e7a9b120c1d1b.tar.bz2
spark-f9a95d67365509cdd260858e858e7a9b120c1d1b.zip
executor creation failed should not make the worker restart
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala32
1 files changed, 20 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 312560d706..c9e4fc2682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -208,18 +208,26 @@ private[spark] class Worker(
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- // TODO (pwendell): We shuld make sparkHome an Option[String] in
- // ApplicationDescription to be more explicit about this.
- val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ try {
+ logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ // TODO (pwendell): We shuld make sparkHome an Option[String] in
+ // ApplicationDescription to be more explicit about this.
+ val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
+ executors(appId + "/" + execId) = manager
+ manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
+ } catch {
+ case e: Exception => {
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+ }
+ }
}
}