aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2014-09-08 20:51:56 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-08 20:51:56 -0700
commit092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33 (patch)
tree4617b2c6247bef0dc434005072e36730b97e5494 /core
parent2b7ab814f9bde65ebc57ebd04386e56c97f06f4a (diff)
downloadspark-092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33.tar.gz
spark-092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33.tar.bz2
spark-092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33.zip
SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors
Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master from removing an Application with RUNNING Executors. Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits over the entire lifetime of the Application, allow that many since any Executor successfully began running the Application; 2) Don't remove the Application while Master still thinks that there are RUNNING Executors. This should be fine as long as the ApplicationInfo doesn't believe any Executors are forever RUNNING when they are not. I think that any non-RUNNING Executors will eventually no longer be RUNNING in Master's accounting, but another set of eyes should confirm that. This PR also doesn't try to detect which nodes have gone rogue or to kill off bad Workers, so repeatedly failing Executors will continue to fail and fill up log files with failure reports as long as the Application keeps running. Author: Mark Hamstra <markhamstra@gmail.com> Closes #1360 from markhamstra/SPARK-2425 and squashes the following commits: f099c0b [Mark Hamstra] Reuse appInfo b2b7b25 [Mark Hamstra] Moved 'Application failed' logging bdd0928 [Mark Hamstra] switched to string interpolation 1dd591b [Mark Hamstra] SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition and prevent Master from removing Application with RUNNING Executors
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
4 files changed, 22 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index d3674427b1..c3ca43f8d0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
def retryCount = _retryCount
- def incrementRetryCount = {
+ def incrementRetryCount() = {
_retryCount += 1
_retryCount
}
+ def resetRetryCount() = _retryCount = 0
+
def markFinished(endState: ApplicationState.Value) {
state = endState
endTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2a66fcfe48..a3909d6ea9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -296,28 +296,34 @@ private[spark] class Master(
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
+ val appInfo = idToApp(appId)
exec.state = state
+ if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
- val appInfo = idToApp(appId)
// Remove this executor from the worker and app
- logInfo("Removing executor " + exec.fullId + " because it is " + state)
+ logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
- val normalExit = exitStatus.exists(_ == 0)
+ val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
- if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
- schedule()
- } else if (!normalExit) {
- logError("Application %s with ID %s failed %d times, removing it".format(
- appInfo.desc.name, appInfo.id, appInfo.retryCount))
- removeApplication(appInfo, ApplicationState.FAILED)
+ if (!normalExit) {
+ if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ val execs = appInfo.executors.values
+ if (!execs.exists(_.state == ExecutorState.RUNNING)) {
+ logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
+ s"${appInfo.retryCount} times; removing it")
+ removeApplication(appInfo, ApplicationState.FAILED)
+ }
+ }
}
}
}
case None =>
- logWarning("Got status update for unknown executor " + appId + "/" + execId)
+ logWarning(s"Got status update for unknown executor $appId/$execId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7be89f9aff..00a43673e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
+ state = ExecutorState.RUNNING
+ worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e475567db6..0c454e4138 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -234,7 +234,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
+ self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_