aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-03 11:44:27 -0700
committerAndrew Or <andrew@databricks.com>2015-04-03 11:44:27 -0700
commit88504b75ee610e14d7ceed8b038fa698a3d14f81 (patch)
treeeb6d02998c65deb378cba5edc8bfb36df9afdf0c
parent2c43ea38ee0db6b292c14baf6bc6f8d16f509c9d (diff)
downloadspark-88504b75ee610e14d7ceed8b038fa698a3d14f81.tar.gz
spark-88504b75ee610e14d7ceed8b038fa698a3d14f81.tar.bz2
spark-88504b75ee610e14d7ceed8b038fa698a3d14f81.zip
[SPARK-6640][Core] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
This PR moved the code of creating `HeartbeatReceiver` above the code of creating `schedulerBackend` to resolve the race condition. Author: zsxwing <zsxwing@gmail.com> Closes #5306 from zsxwing/SPARK-6640 and squashes the following commits: 840399d [zsxwing] Don't send TaskScheduler through Akka a90616a [zsxwing] Fix docs dd202c7 [zsxwing] Fix typo d7c250d [zsxwing] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
2 files changed, 33 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 8435e1ea26..9f8ad03b91 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -37,6 +37,12 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)
+/**
+ * An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
+ * created.
+ */
+private[spark] case object TaskSchedulerIsSet
+
private[spark] case object ExpireDeadHosts
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
@@ -44,9 +50,11 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
-private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext)
extends Actor with ActorLogReceive with Logging {
+ private var scheduler: TaskScheduler = null
+
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]
@@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
}
override def receiveWithLogging: PartialFunction[Any, Unit] = {
- case Heartbeat(executorId, taskMetrics, blockManagerId) =>
- val unknownExecutor = !scheduler.executorHeartbeatReceived(
- executorId, taskMetrics, blockManagerId)
- val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
- executorLastSeen(executorId) = System.currentTimeMillis()
- sender ! response
+ case TaskSchedulerIsSet =>
+ scheduler = sc.taskScheduler
+ case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
+ if (scheduler != null) {
+ val unknownExecutor = !scheduler.executorHeartbeatReceived(
+ executorId, taskMetrics, blockManagerId)
+ val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+ executorLastSeen(executorId) = System.currentTimeMillis()
+ sender ! response
+ } else {
+ // Because Executor will sleep several seconds before sending the first "Heartbeat", this
+ // case rarely happens. However, if it really happens, log it and ask the executor to
+ // register itself again.
+ logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
+ sender ! HeartbeatResponse(reregisterBlockManager = true)
+ }
case ExpireDeadHosts =>
expireDeadHosts()
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index abf81e312d..fd1838976e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val sparkUser = Utils.getCurrentUserName()
executorEnvs("SPARK_USER") = sparkUser
+ // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
+ // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
+ private val heartbeatReceiver = env.actorSystem.actorOf(
+ Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
+
// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
- private val heartbeatReceiver = env.actorSystem.actorOf(
- Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
+
+ heartbeatReceiver ! TaskSchedulerIsSet
+
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)