aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala26
1 files changed, 17 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 68d05d5b02..f2b024ff6c 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
private var timeoutCheckingTask: ScheduledFuture[_] = null
- private val timeoutCheckingThread =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
+ // "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
+ // block the thread for a long time.
+ private val eventLoopThread =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
override def onStart(): Unit = {
- timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
+ timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ExpireDeadHosts))
}
@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
- val unknownExecutor = !scheduler.executorHeartbeatReceived(
- executorId, taskMetrics, blockManagerId)
- val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
- context.reply(response)
+ eventLoopThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ val unknownExecutor = !scheduler.executorHeartbeatReceived(
+ executorId, taskMetrics, blockManagerId)
+ val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+ context.reply(response)
+ }
+ })
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
- override def run(): Unit = sc.killExecutor(executorId)
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ sc.killExecutor(executorId)
+ }
})
}
executorLastSeen.remove(executorId)
@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
}
- timeoutCheckingThread.shutdownNow()
+ eventLoopThread.shutdownNow()
killExecutorThread.shutdownNow()
}
}