diff options
author | zsxwing <zsxwing@gmail.com> | 2015-01-11 16:23:28 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-01-11 16:23:28 -0800 |
commit | 6942b974adad396cba2799eac1fa90448cea4da7 (patch) | |
tree | 80d28dcbdca54510bcb22fdfe52d0d0ebe23159c | |
parent | 1656aae2b4e8b026f8cfe782519f72d32ed2b291 (diff) | |
download | spark-6942b974adad396cba2799eac1fa90448cea4da7.tar.gz spark-6942b974adad396cba2799eac1fa90448cea4da7.tar.bz2 spark-6942b974adad396cba2799eac1fa90448cea4da7.zip |
[SPARK-4951][Core] Fix the issue that a busy executor may be killed
A few changes to fix this issue:
1. Handle the case that receiving `SparkListenerTaskStart` before `SparkListenerBlockManagerAdded`.
2. Don't add `executorId` to `removeTimes` when the executor is busy.
3. Use `HashMap.retain` to safely traverse the HashMap and remove items.
4. Use the same lock in ExecutorAllocationManager and ExecutorAllocationListener to fix the race condition in `totalPendingTasks`.
5. Move the blocking codes out of the message processing code in YarnSchedulerActor.
Author: zsxwing <zsxwing@gmail.com>
Closes #3783 from zsxwing/SPARK-4951 and squashes the following commits:
d51fa0d [zsxwing] Add comments
2e365ce [zsxwing] Remove expired executors from 'removeTimes' and add idle executors back when a new executor joins
49f61a9 [zsxwing] Eliminate duplicate executor registered warnings
d4c4e9a [zsxwing] Minor fixes for the code style
05f6238 [zsxwing] Move the blocking codes out of the message processing code
105ba3a [zsxwing] Fix the race condition in totalPendingTasks
d5c615d [zsxwing] Fix the issue that a busy executor may be killed
3 files changed, 144 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index e9e90e3f2f..a0ee2a7cbb 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager( listenerBus: LiveListenerBus, conf: SparkConf) extends Logging { + + allocationManager => + import ExecutorAllocationManager._ // Lower and upper bounds on the number of executors. These are required. @@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager( private var clock: Clock = new RealClock // Listener for Spark events that impact the allocation policy - private val listener = new ExecutorAllocationListener(this) + private val listener = new ExecutorAllocationListener /** * Verify that the settings specified through the config are valid. @@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager( addTime += sustainedSchedulerBacklogTimeout * 1000 } - removeTimes.foreach { case (executorId, expireTime) => - if (now >= expireTime) { + removeTimes.retain { case (executorId, expireTime) => + val expired = now >= expireTime + if (expired) { removeExecutor(executorId) - removeTimes.remove(executorId) } + !expired } } @@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager( // Do not kill the executor if we have already reached the lower bound val numExistingExecutors = executorIds.size - executorsPendingToRemove.size if (numExistingExecutors - 1 < minNumExecutors) { - logInfo(s"Not removing idle executor $executorId because there are only " + + logDebug(s"Not removing idle executor $executorId because there are only " + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") return false } @@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager( private def onExecutorAdded(executorId: String): Unit = synchronized { if (!executorIds.contains(executorId)) { executorIds.add(executorId) - executorIds.foreach(onExecutorIdle) + // If an executor (call this executor X) is not removed because the lower bound + // has been reached, it will no longer be marked as idle. When new executors join, + // however, we are no longer at the lower bound, and so we must mark executor X + // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) + executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") if (numExecutorsPending > 0) { numExecutorsPending -= 1 @@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager( * the executor is not already marked as idle. */ private def onExecutorIdle(executorId: String): Unit = synchronized { - if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { - logDebug(s"Starting idle timer for $executorId because there are no more tasks " + - s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)") - removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000 + if (executorIds.contains(executorId)) { + if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { + logDebug(s"Starting idle timer for $executorId because there are no more tasks " + + s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)") + removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000 + } + } else { + logWarning(s"Attempted to mark unknown executor $executorId idle") } } @@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager( * and consistency of events returned by the listener. For simplicity, it does not account * for speculated tasks. */ - private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager) - extends SparkListener { + private class ExecutorAllocationListener extends SparkListener { private val stageIdToNumTasks = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { - synchronized { - val stageId = stageSubmitted.stageInfo.stageId - val numTasks = stageSubmitted.stageInfo.numTasks + val stageId = stageSubmitted.stageInfo.stageId + val numTasks = stageSubmitted.stageInfo.numTasks + allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks allocationManager.onSchedulerBacklogged() } } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { - synchronized { - val stageId = stageCompleted.stageInfo.stageId + val stageId = stageCompleted.stageInfo.stageId + allocationManager.synchronized { stageIdToNumTasks -= stageId stageIdToTaskIndices -= stageId @@ -426,39 +437,49 @@ private[spark] class ExecutorAllocationManager( } } - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { val stageId = taskStart.stageId val taskId = taskStart.taskInfo.taskId val taskIndex = taskStart.taskInfo.index val executorId = taskStart.taskInfo.executorId - // If this is the last pending task, mark the scheduler queue as empty - stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex - val numTasksScheduled = stageIdToTaskIndices(stageId).size - val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1) - if (numTasksScheduled == numTasksTotal) { - // No more pending tasks for this stage - stageIdToNumTasks -= stageId - if (stageIdToNumTasks.isEmpty) { - allocationManager.onSchedulerQueueEmpty() + allocationManager.synchronized { + // This guards against the race condition in which the `SparkListenerTaskStart` + // event is posted before the `SparkListenerBlockManagerAdded` event, which is + // possible because these events are posted in different threads. (see SPARK-4951) + if (!allocationManager.executorIds.contains(executorId)) { + allocationManager.onExecutorAdded(executorId) + } + + // If this is the last pending task, mark the scheduler queue as empty + stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex + val numTasksScheduled = stageIdToTaskIndices(stageId).size + val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1) + if (numTasksScheduled == numTasksTotal) { + // No more pending tasks for this stage + stageIdToNumTasks -= stageId + if (stageIdToNumTasks.isEmpty) { + allocationManager.onSchedulerQueueEmpty() + } } - } - // Mark the executor on which this task is scheduled as busy - executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId - allocationManager.onExecutorBusy(executorId) + // Mark the executor on which this task is scheduled as busy + executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId + allocationManager.onExecutorBusy(executorId) + } } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val executorId = taskEnd.taskInfo.executorId val taskId = taskEnd.taskInfo.taskId - - // If the executor is no longer running scheduled any tasks, mark it as idle - if (executorIdToTaskIds.contains(executorId)) { - executorIdToTaskIds(executorId) -= taskId - if (executorIdToTaskIds(executorId).isEmpty) { - executorIdToTaskIds -= executorId - allocationManager.onExecutorIdle(executorId) + allocationManager.synchronized { + // If the executor is no longer running scheduled any tasks, mark it as idle + if (executorIdToTaskIds.contains(executorId)) { + executorIdToTaskIds(executorId) -= taskId + if (executorIdToTaskIds(executorId).isEmpty) { + executorIdToTaskIds -= executorId + allocationManager.onExecutorIdle(executorId) + } } } } @@ -466,7 +487,12 @@ private[spark] class ExecutorAllocationManager( override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { val executorId = blockManagerAdded.blockManagerId.executorId if (executorId != SparkContext.DRIVER_IDENTIFIER) { - allocationManager.onExecutorAdded(executorId) + // This guards against the race condition in which the `SparkListenerTaskStart` + // event is posted before the `SparkListenerBlockManagerAdded` event, which is + // possible because these events are posted in different threads. (see SPARK-4951) + if (!allocationManager.executorIds.contains(executorId)) { + allocationManager.onExecutorAdded(executorId) + } } } @@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager( /** * An estimate of the total number of pending tasks remaining for currently running stages. Does * not account for tasks which may have failed and been resubmitted. + * + * Note: This is not thread-safe without the caller owning the `allocationManager` lock. */ def totalPendingTasks(): Int = { stageIdToNumTasks.map { case (stageId, numTasks) => numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) }.sum } + + /** + * Return true if an executor is not currently running a task, and false otherwise. + * + * Note: This is not thread-safe without the caller owning the `allocationManager` lock. + */ + def isExecutorIdle(executorId: String): Boolean = { + !executorIdToTaskIds.contains(executorId) + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 50721b9d6c..f14aaeea0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler.cluster +import scala.concurrent.{Future, ExecutionContext} + import akka.actor.{Actor, ActorRef, Props} import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} @@ -24,7 +26,9 @@ import org.apache.spark.SparkContext import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.{AkkaUtils, Utils} + +import scala.util.control.NonFatal /** * Abstract Yarn scheduler backend that contains common logic @@ -97,6 +101,9 @@ private[spark] abstract class YarnSchedulerBackend( private class YarnSchedulerActor extends Actor { private var amActor: Option[ActorRef] = None + implicit val askAmActorExecutor = ExecutionContext.fromExecutor( + Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-executor")) + override def preStart(): Unit = { // Listen for disassociation events context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -110,7 +117,12 @@ private[spark] abstract class YarnSchedulerBackend( case r: RequestExecutors => amActor match { case Some(actor) => - sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout) + val driverActor = sender + Future { + driverActor ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout) + } onFailure { + case NonFatal(e) => logError(s"Sending $r to AM was unsuccessful", e) + } case None => logWarning("Attempted to request executors before the AM has registered!") sender ! false @@ -119,7 +131,12 @@ private[spark] abstract class YarnSchedulerBackend( case k: KillExecutors => amActor match { case Some(actor) => - sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout) + val driverActor = sender + Future { + driverActor ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout) + } onFailure { + case NonFatal(e) => logError(s"Sending $k to AM was unsuccessful", e) + } case None => logWarning("Attempted to kill executors before the AM has registered!") sender ! false diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index c817f6dced..0e4df17c1b 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark +import scala.collection.mutable + import org.scalatest.{FunSuite, PrivateMethodTester} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ @@ -143,11 +145,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { // Verify that running a task reduces the cap sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) + sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( + 0L, BlockManagerId("executor-1", "host1", 1), 100L)) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + assert(numExecutorsPending(manager) === 4) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsPending(manager) === 5) assert(numExecutorsToAdd(manager) === 2) - assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 7) + assert(numExecutorsToAdd(manager) === 4) + assert(addExecutors(manager) === 0) assert(numExecutorsPending(manager) === 7) assert(numExecutorsToAdd(manager) === 1) @@ -325,6 +333,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { val manager = sc.executorAllocationManager.get manager.setClock(clock) + executorIds(manager).asInstanceOf[mutable.Set[String]] ++= List("1", "2", "3") + // Starting remove timer is idempotent for each executor assert(removeTimes(manager).isEmpty) onExecutorIdle(manager, "1") @@ -597,6 +607,41 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(removeTimes(manager).size === 1) } + test("SPARK-4951: call onTaskStart before onBlockManagerAdded") { + sc = createSparkContext(2, 10) + val manager = sc.executorAllocationManager.get + assert(executorIds(manager).isEmpty) + assert(removeTimes(manager).isEmpty) + + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( + 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + assert(executorIds(manager).size === 1) + assert(executorIds(manager).contains("executor-1")) + assert(removeTimes(manager).size === 0) + } + + test("SPARK-4951: onExecutorAdded should not add a busy executor to removeTimes") { + sc = createSparkContext(2, 10) + val manager = sc.executorAllocationManager.get + assert(executorIds(manager).isEmpty) + assert(removeTimes(manager).isEmpty) + sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( + 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + + assert(executorIds(manager).size === 1) + assert(executorIds(manager).contains("executor-1")) + assert(removeTimes(manager).size === 0) + + sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( + 0L, BlockManagerId("executor-2", "host1", 1), 100L)) + assert(executorIds(manager).size === 2) + assert(executorIds(manager).contains("executor-2")) + assert(removeTimes(manager).size === 1) + assert(removeTimes(manager).contains("executor-2")) + assert(!removeTimes(manager).contains("executor-1")) + } } /** |