From 88504b75ee610e14d7ceed8b038fa698a3d14f81 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 3 Apr 2015 11:44:27 -0700 Subject: [SPARK-6640][Core] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver This PR moved the code of creating `HeartbeatReceiver` above the code of creating `schedulerBackend` to resolve the race condition. Author: zsxwing Closes #5306 from zsxwing/SPARK-6640 and squashes the following commits: 840399d [zsxwing] Don't send TaskScheduler through Akka a90616a [zsxwing] Fix docs dd202c7 [zsxwing] Fix typo d7c250d [zsxwing] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 32 +++++++++++++++++----- .../main/scala/org/apache/spark/SparkContext.scala | 10 +++++-- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 8435e1ea26..9f8ad03b91 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -37,6 +37,12 @@ private[spark] case class Heartbeat( taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId) +/** + * An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is + * created. + */ +private[spark] case object TaskSchedulerIsSet + private[spark] case object ExpireDeadHosts private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) @@ -44,9 +50,11 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** * Lives in the driver to receive heartbeats from executors.. */ -private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) +private[spark] class HeartbeatReceiver(sc: SparkContext) extends Actor with ActorLogReceive with Logging { + private var scheduler: TaskScheduler = null + // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new mutable.HashMap[String, Long] @@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule } override def receiveWithLogging: PartialFunction[Any, Unit] = { - case Heartbeat(executorId, taskMetrics, blockManagerId) => - val unknownExecutor = !scheduler.executorHeartbeatReceived( - executorId, taskMetrics, blockManagerId) - val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) - executorLastSeen(executorId) = System.currentTimeMillis() - sender ! response + case TaskSchedulerIsSet => + scheduler = sc.taskScheduler + case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => + if (scheduler != null) { + val unknownExecutor = !scheduler.executorHeartbeatReceived( + executorId, taskMetrics, blockManagerId) + val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) + executorLastSeen(executorId) = System.currentTimeMillis() + sender ! response + } else { + // Because Executor will sleep several seconds before sending the first "Heartbeat", this + // case rarely happens. However, if it really happens, log it and ask the executor to + // register itself again. + logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") + sender ! HeartbeatResponse(reregisterBlockManager = true) + } case ExpireDeadHosts => expireDeadHosts() } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index abf81e312d..fd1838976e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val sparkUser = Utils.getCurrentUserName() executorEnvs("SPARK_USER") = sparkUser + // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will + // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) + private val heartbeatReceiver = env.actorSystem.actorOf( + Props(new HeartbeatReceiver(this)), "HeartbeatReceiver") + // Create and start the scheduler private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) - private val heartbeatReceiver = env.actorSystem.actorOf( - Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver") + + heartbeatReceiver ! TaskSchedulerIsSet + @volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) -- cgit v1.2.3