aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-09-15 10:23:41 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-15 10:23:41 -0700
commitb479278142728eb003b9ee466fab0e8d6ec4b13d (patch)
tree7e83576cf49a9eef9e9e99bf966ec7b98c0c027f
parent2ad276954858b0a7b3f442b9e440c72cbb1610e2 (diff)
downloadspark-b479278142728eb003b9ee466fab0e8d6ec4b13d.tar.gz
spark-b479278142728eb003b9ee466fab0e8d6ec4b13d.tar.bz2
spark-b479278142728eb003b9ee466fab0e8d6ec4b13d.zip
[SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before self-kill
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17451 `CoarseGrainedExecutorBackend` in some failure cases exits the JVM. While this does not have any issue, from the driver UI there is no specific reason captured for this. In this PR, I am adding functionality to `exitExecutor` to notify driver that the executor is exiting. ## How was this patch tested? Ran the change over a test env and took down shuffle service before the executor could register to it. In the driver logs, where the job failure reason is mentioned (ie. `Job aborted due to stage ...` it gives the correct reason: Before: `ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.` After: `ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running tasks) Reason: Unable to create executor due to java.util.concurrent.TimeoutException: Timeout waiting for task.` Author: Tejas Patil <tejasp@fb.com> Closes #15013 from tejasapatil/SPARK-17451_inform_driver.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
2 files changed, 23 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 391b97d73e..7eec4ae64f 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
- exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
+ exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
- exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
+ exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
+ notifyDriver = false)
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
- protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
+ protected def exitExecutor(code: Int,
+ reason: String,
+ throwable: Throwable = null,
+ notifyDriver: Boolean = true) = {
+ val message = "Executor self-exiting due to : " + reason
if (throwable != null) {
- logError(reason, throwable)
+ logError(message, throwable)
} else {
- logError(reason)
+ logError(message)
}
+
+ if (notifyDriver && driver.nonEmpty) {
+ driver.get.ask[Boolean](
+ RemoveExecutor(executorId, new ExecutorLossReason(reason))
+ ).onFailure { case e =>
+ logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
+ }(ThreadUtils.sameThread)
+ }
+
System.exit(code)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a724fdf009..c172ac2cdc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -199,6 +199,9 @@ private[spark] class BlockManager(
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
+ case NonFatal(e) =>
+ throw new SparkException("Unable to register with external shuffle server due to : " +
+ e.getMessage, e)
}
}
}