diff options
author | Nong Li <nong@databricks.com> | 2015-12-18 16:05:18 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-12-18 16:05:18 -0800 |
commit | 0514e8d4b69615ba8918649e7e3c46b5713b6540 (patch) | |
tree | 8f90d360c00d7dc8fd50fbf2db7310c02ffc2d76 /core/src | |
parent | 60da0e11f6724d86df16795a7a1166879215d547 (diff) | |
download | spark-0514e8d4b69615ba8918649e7e3c46b5713b6540.tar.gz spark-0514e8d4b69615ba8918649e7e3c46b5713b6540.tar.bz2 spark-0514e8d4b69615ba8918649e7e3c46b5713b6540.zip |
[SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat interval
Previously, the rpc timeout was the default network timeout, which is the same value
the driver uses to determine dead executors. This means if there is a network issue,
the executor is determined dead after one heartbeat attempt. There is a separate config
for the heartbeat interval which is a better value to use for the heartbeat RPC. With
this change, the executor will make multiple heartbeat attempts even with RPC issues.
Author: Nong Li <nong@databricks.com>
Closes #10365 from nongli/spark-12411.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 552b644d13..9b14184364 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.rpc.RpcTimeout import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} @@ -445,7 +446,8 @@ private[spark] class Executor( val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) try { - val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message) + val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( + message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() |