aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-09-23 13:44:18 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-23 13:44:18 -0700
commitd79238d03a2ffe0cf5fc6166543d67768693ddbe (patch)
tree674d3f4674d23c2ac9a5a5bbffbb8fd59746dae0 /core
parent8dfe79ffb204807945e3c09b75c7255b09ad2a97 (diff)
downloadspark-d79238d03a2ffe0cf5fc6166543d67768693ddbe.tar.gz
spark-d79238d03a2ffe0cf5fc6166543d67768693ddbe.tar.bz2
spark-d79238d03a2ffe0cf5fc6166543d67768693ddbe.zip
SPARK-3612. Executor shouldn't quit if heartbeat message fails to reach ...
...the driver Author: Sandy Ryza <sandy@cloudera.com> Closes #2487 from sryza/sandy-spark-3612 and squashes the following commits: 2b7353d [Sandy Ryza] SPARK-3612. Executor shouldn't quit if heartbeat message fails to reach the driver
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index acae448a9c..d7211ae465 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,6 +24,7 @@ import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@@ -375,12 +376,17 @@ private[spark] class Executor(
}
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
- val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
- retryAttempts, retryIntervalMs, timeout)
- if (response.reregisterBlockManager) {
- logWarning("Told to re-register on heartbeat")
- env.blockManager.reregister()
+ try {
+ val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
+ retryAttempts, retryIntervalMs, timeout)
+ if (response.reregisterBlockManager) {
+ logWarning("Told to re-register on heartbeat")
+ env.blockManager.reregister()
+ }
+ } catch {
+ case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
}
+
Thread.sleep(interval)
}
}