aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2015-12-18 16:05:18 -0800
committerAndrew Or <andrew@databricks.com>2015-12-18 16:05:18 -0800
commit0514e8d4b69615ba8918649e7e3c46b5713b6540 (patch)
tree8f90d360c00d7dc8fd50fbf2db7310c02ffc2d76
parent60da0e11f6724d86df16795a7a1166879215d547 (diff)
downloadspark-0514e8d4b69615ba8918649e7e3c46b5713b6540.tar.gz
spark-0514e8d4b69615ba8918649e7e3c46b5713b6540.tar.bz2
spark-0514e8d4b69615ba8918649e7e3c46b5713b6540.zip
[SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat interval
Previously, the rpc timeout was the default network timeout, which is the same value the driver uses to determine dead executors. This means if there is a network issue, the executor is determined dead after one heartbeat attempt. There is a separate config for the heartbeat interval which is a better value to use for the heartbeat RPC. With this change, the executor will make multiple heartbeat attempts even with RPC issues. Author: Nong Li <nong@databricks.com> Closes #10365 from nongli/spark-12411.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 552b644d13..9b14184364 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
@@ -445,7 +446,8 @@ private[spark] class Executor(
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
try {
- val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
+ val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
+ message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()