aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
2 files changed, 23 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 391b97d73e..7eec4ae64f 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
- exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
+ exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
- exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
+ exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
+ notifyDriver = false)
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
- protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
+ protected def exitExecutor(code: Int,
+ reason: String,
+ throwable: Throwable = null,
+ notifyDriver: Boolean = true) = {
+ val message = "Executor self-exiting due to : " + reason
if (throwable != null) {
- logError(reason, throwable)
+ logError(message, throwable)
} else {
- logError(reason)
+ logError(message)
}
+
+ if (notifyDriver && driver.nonEmpty) {
+ driver.get.ask[Boolean](
+ RemoveExecutor(executorId, new ExecutorLossReason(reason))
+ ).onFailure { case e =>
+ logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
+ }(ThreadUtils.sameThread)
+ }
+
System.exit(code)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a724fdf009..c172ac2cdc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -199,6 +199,9 @@ private[spark] class BlockManager(
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
+ case NonFatal(e) =>
+ throw new SparkException("Unable to register with external shuffle server due to : " +
+ e.getMessage, e)
}
}
}