aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
4 files changed, 22 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index d3674427b1..c3ca43f8d0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
def retryCount = _retryCount
- def incrementRetryCount = {
+ def incrementRetryCount() = {
_retryCount += 1
_retryCount
}
+ def resetRetryCount() = _retryCount = 0
+
def markFinished(endState: ApplicationState.Value) {
state = endState
endTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2a66fcfe48..a3909d6ea9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -296,28 +296,34 @@ private[spark] class Master(
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
+ val appInfo = idToApp(appId)
exec.state = state
+ if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
- val appInfo = idToApp(appId)
// Remove this executor from the worker and app
- logInfo("Removing executor " + exec.fullId + " because it is " + state)
+ logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
- val normalExit = exitStatus.exists(_ == 0)
+ val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
- if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
- schedule()
- } else if (!normalExit) {
- logError("Application %s with ID %s failed %d times, removing it".format(
- appInfo.desc.name, appInfo.id, appInfo.retryCount))
- removeApplication(appInfo, ApplicationState.FAILED)
+ if (!normalExit) {
+ if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ val execs = appInfo.executors.values
+ if (!execs.exists(_.state == ExecutorState.RUNNING)) {
+ logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
+ s"${appInfo.retryCount} times; removing it")
+ removeApplication(appInfo, ApplicationState.FAILED)
+ }
+ }
}
}
}
case None =>
- logWarning("Got status update for unknown executor " + appId + "/" + execId)
+ logWarning(s"Got status update for unknown executor $appId/$execId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7be89f9aff..00a43673e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
+ state = ExecutorState.RUNNING
+ worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e475567db6..0c454e4138 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -234,7 +234,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
+ self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_