aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-07-15 14:27:16 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-07-15 14:27:16 -0700
commitb2f24f94591082d3ff82bd3db1760b14603b38aa (patch)
tree8cc3f4f5fffd814aba5dc07288ae353c50a8503f /core
parent611a8ca5895357059f1e7c035d946e0718b26a5a (diff)
downloadspark-b2f24f94591082d3ff82bd3db1760b14603b38aa.tar.gz
spark-b2f24f94591082d3ff82bd3db1760b14603b38aa.tar.bz2
spark-b2f24f94591082d3ff82bd3db1760b14603b38aa.zip
[SPARK-16230][CORE] CoarseGrainedExecutorBackend to self kill if there is an exception while creating an Executor
## What changes were proposed in this pull request? With the fix from SPARK-13112, I see that `LaunchTask` is always processed after `RegisteredExecutor` is done and so it gets chance to do all retries to startup an executor. There is still a problem that if `Executor` creation itself fails and there is some exception, it gets unnoticed and the executor is killed when it tries to process the `LaunchTask` as `executor` is null : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L88 So if one looks at the logs, it does not tell that there was problem during `Executor` creation and thats why it was killed. This PR explicitly catches exception in `Executor` creation, logs a proper message and then exits the JVM. Also, I have changed the `exitExecutor` method to accept `reason` so that backends can use that reason and do stuff like logging to a DB to get an aggregate of such exits at a cluster level ## How was this patch tested? I am relying on existing tests Author: Tejas Patil <tejasp@fb.com> Closes #14202 from tejasapatil/exit_executor_failure.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala32
1 files changed, 20 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ccc6c36e9c..e30839c49c 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.util.{Failure, Success}
+import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
@@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
- logError(s"Cannot register with driver: $driverUrl", e)
- exitExecutor(1)
+ exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
}(ThreadUtils.sameThread)
}
@@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
- executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
+ try {
+ executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
+ } catch {
+ case NonFatal(e) =>
+ exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
+ }
case RegisterExecutorFailed(message) =>
- logError("Slave registration failed: " + message)
- exitExecutor(1)
+ exitExecutor(1, "Slave registration failed: " + message)
case LaunchTask(data) =>
if (executor == null) {
- logError("Received LaunchTask command but executor was null")
- exitExecutor(1)
+ exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
- logError("Received KillTask command but executor was null")
- exitExecutor(1)
+ exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}
@@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
- logError(s"Driver $remoteAddress disassociated! Shutting down.")
- exitExecutor(1)
+ exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
@@ -147,7 +148,14 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
- protected def exitExecutor(code: Int): Unit = System.exit(code)
+ protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
+ if (throwable != null) {
+ logError(reason, throwable)
+ } else {
+ logError(reason)
+ }
+ System.exit(code)
+ }
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {