aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-07-02 13:59:56 -0700
committerAndrew Or <andrew@databricks.com>2015-07-02 13:59:56 -0700
commitcd2035507891a7f426f6f45902d3b5f4fdbe88cf (patch)
treea03f61825371db98531f61c63ec9fc67dd2f42ae /core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
parentfcbcba66c92871fe3936e5ca605017e9c2a2eb95 (diff)
downloadspark-cd2035507891a7f426f6f45902d3b5f4fdbe88cf.tar.gz
spark-cd2035507891a7f426f6f45902d3b5f4fdbe88cf.tar.bz2
spark-cd2035507891a7f426f6f45902d3b5f4fdbe88cf.zip
[SPARK-7835] Refactor HeartbeatReceiverSuite for coverage + cleanup
The existing test suite has a lot of duplicate code and doesn't even cover the most fundamental feature of the HeartbeatReceiver, which is expiring hosts that have not responded in a while. This introduces manual clocks in `HeartbeatReceiver` and makes it respond to heartbeats only for registered executors. A few internal messages are moved to `receiveAndReply` to increase determinism of the tests so we don't have to rely on flaky constructs like `eventually`. Author: Andrew Or <andrew@databricks.com> Closes #7173 from andrewor14/heartbeat-receiver-tests and squashes the following commits: 4a903d6 [Andrew Or] Increase HeartReceiverSuite coverage and clean up
Diffstat (limited to 'core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala89
1 files changed, 69 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 6909015ff6..221b1dab43 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -24,8 +24,8 @@ import scala.collection.mutable
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -45,13 +45,23 @@ private[spark] case object TaskSchedulerIsSet
private[spark] case object ExpireDeadHosts
+private case class ExecutorRegistered(executorId: String)
+
+private case class ExecutorRemoved(executorId: String)
+
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
-private[spark] class HeartbeatReceiver(sc: SparkContext)
- extends ThreadSafeRpcEndpoint with Logging {
+private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
+ extends ThreadSafeRpcEndpoint with SparkListener with Logging {
+
+ def this(sc: SparkContext) {
+ this(sc, new SystemClock)
+ }
+
+ sc.addSparkListener(this)
override val rpcEnv: RpcEnv = sc.env.rpcEnv
@@ -86,30 +96,48 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
override def onStart(): Unit = {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(ExpireDeadHosts))
+ Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
}
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}
- override def receive: PartialFunction[Any, Unit] = {
- case ExpireDeadHosts =>
- expireDeadHosts()
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+
+ // Messages sent and received locally
+ case ExecutorRegistered(executorId) =>
+ executorLastSeen(executorId) = clock.getTimeMillis()
+ context.reply(true)
+ case ExecutorRemoved(executorId) =>
+ executorLastSeen.remove(executorId)
+ context.reply(true)
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
- }
+ context.reply(true)
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+ context.reply(true)
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ // Messages received from executors
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
- executorLastSeen(executorId) = System.currentTimeMillis()
- eventLoopThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- val unknownExecutor = !scheduler.executorHeartbeatReceived(
- executorId, taskMetrics, blockManagerId)
- val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
- context.reply(response)
- }
- })
+ if (executorLastSeen.contains(executorId)) {
+ executorLastSeen(executorId) = clock.getTimeMillis()
+ eventLoopThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ val unknownExecutor = !scheduler.executorHeartbeatReceived(
+ executorId, taskMetrics, blockManagerId)
+ val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+ context.reply(response)
+ }
+ })
+ } else {
+ // This may happen if we get an executor's in-flight heartbeat immediately
+ // after we just removed it. It's not really an error condition so we should
+ // not log warning here. Otherwise there may be a lot of noise especially if
+ // we explicitly remove executors (SPARK-4134).
+ logDebug(s"Received heartbeat from unknown executor $executorId")
+ context.reply(HeartbeatResponse(reregisterBlockManager = true))
+ }
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
@@ -119,9 +147,30 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
}
}
+ /**
+ * If the heartbeat receiver is not stopped, notify it of executor registrations.
+ */
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
+ Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId)))
+ }
+
+ /**
+ * If the heartbeat receiver is not stopped, notify it of executor removals so it doesn't
+ * log superfluous errors.
+ *
+ * Note that we must do this after the executor is actually removed to guard against the
+ * following race condition: if we remove an executor's metadata from our data structure
+ * prematurely, we may get an in-flight heartbeat from the executor before the executor is
+ * actually removed, in which case we will still mark the executor as a dead host later
+ * and expire it with loud error messages.
+ */
+ override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
+ Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId)))
+ }
+
private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
- val now = System.currentTimeMillis()
+ val now = clock.getTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +