aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index acae448a9c..d7211ae465 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,6 +24,7 @@ import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@@ -375,12 +376,17 @@ private[spark] class Executor(
}
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
- val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
- retryAttempts, retryIntervalMs, timeout)
- if (response.reregisterBlockManager) {
- logWarning("Told to re-register on heartbeat")
- env.blockManager.reregister()
+ try {
+ val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
+ retryAttempts, retryIntervalMs, timeout)
+ if (response.reregisterBlockManager) {
+ logWarning("Told to re-register on heartbeat")
+ env.blockManager.reregister()
+ }
+ } catch {
+ case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
}
+
Thread.sleep(interval)
}
}