aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-01-11 16:23:28 -0800
committerAndrew Or <andrew@databricks.com>2015-01-11 16:23:56 -0800
commit056149d10c64900fa15ed01843bf952ceb5708ea (patch)
treed4c37f011c8165eb2c73c96ecc160cf1f0948322 /core
parentf04ff9d37ae312dccd16843df7d0e3a03c30e403 (diff)
downloadspark-056149d10c64900fa15ed01843bf952ceb5708ea.tar.gz
spark-056149d10c64900fa15ed01843bf952ceb5708ea.tar.bz2
spark-056149d10c64900fa15ed01843bf952ceb5708ea.zip
[SPARK-4951][Core] Fix the issue that a busy executor may be killed
A few changes to fix this issue: 1. Handle the case that receiving `SparkListenerTaskStart` before `SparkListenerBlockManagerAdded`. 2. Don't add `executorId` to `removeTimes` when the executor is busy. 3. Use `HashMap.retain` to safely traverse the HashMap and remove items. 4. Use the same lock in ExecutorAllocationManager and ExecutorAllocationListener to fix the race condition in `totalPendingTasks`. 5. Move the blocking codes out of the message processing code in YarnSchedulerActor. Author: zsxwing <zsxwing@gmail.com> Closes #3783 from zsxwing/SPARK-4951 and squashes the following commits: d51fa0d [zsxwing] Add comments 2e365ce [zsxwing] Remove expired executors from 'removeTimes' and add idle executors back when a new executor joins 49f61a9 [zsxwing] Eliminate duplicate executor registered warnings d4c4e9a [zsxwing] Minor fixes for the code style 05f6238 [zsxwing] Move the blocking codes out of the message processing code 105ba3a [zsxwing] Fix the race condition in totalPendingTasks d5c615d [zsxwing] Fix the issue that a busy executor may be killed (cherry picked from commit 6942b974adad396cba2799eac1fa90448cea4da7) Signed-off-by: Andrew Or <andrew@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala117
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala49
3 files changed, 144 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index e9e90e3f2f..a0ee2a7cbb 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager(
listenerBus: LiveListenerBus,
conf: SparkConf)
extends Logging {
+
+ allocationManager =>
+
import ExecutorAllocationManager._
// Lower and upper bounds on the number of executors. These are required.
@@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
private var clock: Clock = new RealClock
// Listener for Spark events that impact the allocation policy
- private val listener = new ExecutorAllocationListener(this)
+ private val listener = new ExecutorAllocationListener
/**
* Verify that the settings specified through the config are valid.
@@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager(
addTime += sustainedSchedulerBacklogTimeout * 1000
}
- removeTimes.foreach { case (executorId, expireTime) =>
- if (now >= expireTime) {
+ removeTimes.retain { case (executorId, expireTime) =>
+ val expired = now >= expireTime
+ if (expired) {
removeExecutor(executorId)
- removeTimes.remove(executorId)
}
+ !expired
}
}
@@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager(
// Do not kill the executor if we have already reached the lower bound
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
if (numExistingExecutors - 1 < minNumExecutors) {
- logInfo(s"Not removing idle executor $executorId because there are only " +
+ logDebug(s"Not removing idle executor $executorId because there are only " +
s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
return false
}
@@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorAdded(executorId: String): Unit = synchronized {
if (!executorIds.contains(executorId)) {
executorIds.add(executorId)
- executorIds.foreach(onExecutorIdle)
+ // If an executor (call this executor X) is not removed because the lower bound
+ // has been reached, it will no longer be marked as idle. When new executors join,
+ // however, we are no longer at the lower bound, and so we must mark executor X
+ // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
+ executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
if (numExecutorsPending > 0) {
numExecutorsPending -= 1
@@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager(
* the executor is not already marked as idle.
*/
private def onExecutorIdle(executorId: String): Unit = synchronized {
- if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
- logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
- s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
- removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+ if (executorIds.contains(executorId)) {
+ if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
+ logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
+ s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+ }
+ } else {
+ logWarning(s"Attempted to mark unknown executor $executorId idle")
}
}
@@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager(
* and consistency of events returned by the listener. For simplicity, it does not account
* for speculated tasks.
*/
- private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager)
- extends SparkListener {
+ private class ExecutorAllocationListener extends SparkListener {
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
- synchronized {
- val stageId = stageSubmitted.stageInfo.stageId
- val numTasks = stageSubmitted.stageInfo.numTasks
+ val stageId = stageSubmitted.stageInfo.stageId
+ val numTasks = stageSubmitted.stageInfo.numTasks
+ allocationManager.synchronized {
stageIdToNumTasks(stageId) = numTasks
allocationManager.onSchedulerBacklogged()
}
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
- synchronized {
- val stageId = stageCompleted.stageInfo.stageId
+ val stageId = stageCompleted.stageInfo.stageId
+ allocationManager.synchronized {
stageIdToNumTasks -= stageId
stageIdToTaskIndices -= stageId
@@ -426,39 +437,49 @@ private[spark] class ExecutorAllocationManager(
}
}
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId
val taskId = taskStart.taskInfo.taskId
val taskIndex = taskStart.taskInfo.index
val executorId = taskStart.taskInfo.executorId
- // If this is the last pending task, mark the scheduler queue as empty
- stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
- val numTasksScheduled = stageIdToTaskIndices(stageId).size
- val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
- if (numTasksScheduled == numTasksTotal) {
- // No more pending tasks for this stage
- stageIdToNumTasks -= stageId
- if (stageIdToNumTasks.isEmpty) {
- allocationManager.onSchedulerQueueEmpty()
+ allocationManager.synchronized {
+ // This guards against the race condition in which the `SparkListenerTaskStart`
+ // event is posted before the `SparkListenerBlockManagerAdded` event, which is
+ // possible because these events are posted in different threads. (see SPARK-4951)
+ if (!allocationManager.executorIds.contains(executorId)) {
+ allocationManager.onExecutorAdded(executorId)
+ }
+
+ // If this is the last pending task, mark the scheduler queue as empty
+ stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+ val numTasksScheduled = stageIdToTaskIndices(stageId).size
+ val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
+ if (numTasksScheduled == numTasksTotal) {
+ // No more pending tasks for this stage
+ stageIdToNumTasks -= stageId
+ if (stageIdToNumTasks.isEmpty) {
+ allocationManager.onSchedulerQueueEmpty()
+ }
}
- }
- // Mark the executor on which this task is scheduled as busy
- executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
- allocationManager.onExecutorBusy(executorId)
+ // Mark the executor on which this task is scheduled as busy
+ executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
+ allocationManager.onExecutorBusy(executorId)
+ }
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
-
- // If the executor is no longer running scheduled any tasks, mark it as idle
- if (executorIdToTaskIds.contains(executorId)) {
- executorIdToTaskIds(executorId) -= taskId
- if (executorIdToTaskIds(executorId).isEmpty) {
- executorIdToTaskIds -= executorId
- allocationManager.onExecutorIdle(executorId)
+ allocationManager.synchronized {
+ // If the executor is no longer running scheduled any tasks, mark it as idle
+ if (executorIdToTaskIds.contains(executorId)) {
+ executorIdToTaskIds(executorId) -= taskId
+ if (executorIdToTaskIds(executorId).isEmpty) {
+ executorIdToTaskIds -= executorId
+ allocationManager.onExecutorIdle(executorId)
+ }
}
}
}
@@ -466,7 +487,12 @@ private[spark] class ExecutorAllocationManager(
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
- allocationManager.onExecutorAdded(executorId)
+ // This guards against the race condition in which the `SparkListenerTaskStart`
+ // event is posted before the `SparkListenerBlockManagerAdded` event, which is
+ // possible because these events are posted in different threads. (see SPARK-4951)
+ if (!allocationManager.executorIds.contains(executorId)) {
+ allocationManager.onExecutorAdded(executorId)
+ }
}
}
@@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager(
/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
+ *
+ * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def totalPendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}
+
+ /**
+ * Return true if an executor is not currently running a task, and false otherwise.
+ *
+ * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
+ */
+ def isExecutorIdle(executorId: String): Boolean = {
+ !executorIdToTaskIds.contains(executorId)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 50721b9d6c..f14aaeea0a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler.cluster
+import scala.concurrent.{Future, ExecutionContext}
+
import akka.actor.{Actor, ActorRef, Props}
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
@@ -24,7 +26,9 @@ import org.apache.spark.SparkContext
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+import scala.util.control.NonFatal
/**
* Abstract Yarn scheduler backend that contains common logic
@@ -97,6 +101,9 @@ private[spark] abstract class YarnSchedulerBackend(
private class YarnSchedulerActor extends Actor {
private var amActor: Option[ActorRef] = None
+ implicit val askAmActorExecutor = ExecutionContext.fromExecutor(
+ Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-executor"))
+
override def preStart(): Unit = {
// Listen for disassociation events
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -110,7 +117,12 @@ private[spark] abstract class YarnSchedulerBackend(
case r: RequestExecutors =>
amActor match {
case Some(actor) =>
- sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+ val driverActor = sender
+ Future {
+ driverActor ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+ } onFailure {
+ case NonFatal(e) => logError(s"Sending $r to AM was unsuccessful", e)
+ }
case None =>
logWarning("Attempted to request executors before the AM has registered!")
sender ! false
@@ -119,7 +131,12 @@ private[spark] abstract class YarnSchedulerBackend(
case k: KillExecutors =>
amActor match {
case Some(actor) =>
- sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+ val driverActor = sender
+ Future {
+ driverActor ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+ } onFailure {
+ case NonFatal(e) => logError(s"Sending $k to AM was unsuccessful", e)
+ }
case None =>
logWarning("Attempted to kill executors before the AM has registered!")
sender ! false
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index ce804f94f3..4fa3b34412 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import scala.collection.mutable
+
import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
@@ -142,11 +144,17 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
+ sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+ 0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
+ assert(numExecutorsPending(manager) === 4)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsPending(manager) === 5)
assert(numExecutorsToAdd(manager) === 2)
- assert(addExecutors(manager) === 1)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 7)
assert(numExecutorsToAdd(manager) === 1)
@@ -324,6 +332,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
+ executorIds(manager).asInstanceOf[mutable.Set[String]] ++= List("1", "2", "3")
+
// Starting remove timer is idempotent for each executor
assert(removeTimes(manager).isEmpty)
onExecutorIdle(manager, "1")
@@ -596,6 +606,41 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).size === 1)
}
+ test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
+ sc = createSparkContext(2, 10)
+ val manager = sc.executorAllocationManager.get
+ assert(executorIds(manager).isEmpty)
+ assert(removeTimes(manager).isEmpty)
+
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
+ sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+ 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ assert(executorIds(manager).size === 1)
+ assert(executorIds(manager).contains("executor-1"))
+ assert(removeTimes(manager).size === 0)
+ }
+
+ test("SPARK-4951: onExecutorAdded should not add a busy executor to removeTimes") {
+ sc = createSparkContext(2, 10)
+ val manager = sc.executorAllocationManager.get
+ assert(executorIds(manager).isEmpty)
+ assert(removeTimes(manager).isEmpty)
+ sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+ 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
+
+ assert(executorIds(manager).size === 1)
+ assert(executorIds(manager).contains("executor-1"))
+ assert(removeTimes(manager).size === 0)
+
+ sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+ 0L, BlockManagerId("executor-2", "host1", 1), 100L))
+ assert(executorIds(manager).size === 2)
+ assert(executorIds(manager).contains("executor-2"))
+ assert(removeTimes(manager).size === 1)
+ assert(removeTimes(manager).contains("executor-2"))
+ assert(!removeTimes(manager).contains("executor-1"))
+ }
}
/**