diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-09-23 13:44:18 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-09-23 13:44:18 -0700 |
commit | d79238d03a2ffe0cf5fc6166543d67768693ddbe (patch) | |
tree | 674d3f4674d23c2ac9a5a5bbffbb8fd59746dae0 /core/src | |
parent | 8dfe79ffb204807945e3c09b75c7255b09ad2a97 (diff) | |
download | spark-d79238d03a2ffe0cf5fc6166543d67768693ddbe.tar.gz spark-d79238d03a2ffe0cf5fc6166543d67768693ddbe.tar.bz2 spark-d79238d03a2ffe0cf5fc6166543d67768693ddbe.zip |
SPARK-3612. Executor shouldn't quit if heartbeat message fails to reach ...
...the driver
Author: Sandy Ryza <sandy@cloudera.com>
Closes #2487 from sryza/sandy-spark-3612 and squashes the following commits:
2b7353d [Sandy Ryza] SPARK-3612. Executor shouldn't quit if heartbeat message fails to reach the driver
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index acae448a9c..d7211ae465 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -24,6 +24,7 @@ import java.util.concurrent._ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -375,12 +376,17 @@ private[spark] class Executor( } val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) - val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, - retryAttempts, retryIntervalMs, timeout) - if (response.reregisterBlockManager) { - logWarning("Told to re-register on heartbeat") - env.blockManager.reregister() + try { + val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, + retryAttempts, retryIntervalMs, timeout) + if (response.reregisterBlockManager) { + logWarning("Told to re-register on heartbeat") + env.blockManager.reregister() + } + } catch { + case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t) } + Thread.sleep(interval) } } |