aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala8
2 files changed, 29 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a602fcac68..86c121f787 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -114,6 +114,19 @@ private[spark] class Executor(
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
+ /**
+ * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
+ * times, it should kill itself. The default value is 60. It means we will retry to send
+ * heartbeats about 10 minutes because the heartbeat interval is 10s.
+ */
+ private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
+
+ /**
+ * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
+ * successful heartbeat will reset it to 0.
+ */
+ private var heartbeatFailures = 0
+
startDriverHeartbeater()
def launchTask(
@@ -461,8 +474,15 @@ private[spark] class Executor(
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
+ heartbeatFailures = 0
} catch {
- case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
+ case NonFatal(e) =>
+ logWarning("Issue communicating with driver in heartbeater", e)
+ logError(s"Unable to send heartbeats to driver more than $HEARTBEAT_MAX_FAILURES times")
+ heartbeatFailures += 1
+ if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
+ System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index ea36fb60bd..99858f7856 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -39,6 +39,12 @@ object ExecutorExitCode {
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
+ /**
+ * Executor is unable to send heartbeats to the driver more than
+ * "spark.executor.heartbeat.maxFailures" times.
+ */
+ val HEARTBEAT_FAILURE = 56
+
def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -51,6 +57,8 @@ object ExecutorExitCode {
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
"ExternalBlockStore failed to create a local temporary directory."
+ case HEARTBEAT_FAILURE =>
+ "Unable to send heartbeats to driver."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {