aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-12-15 08:29:56 -0600
committerImran Rashid <irashid@cloudera.com>2016-12-15 08:29:56 -0600
commit93cdb8a7d0f124b4db069fd8242207c82e263c52 (patch)
treec0f626664bfa6bad965b85a3cc54438bf15b4332
parent7d858bc5ce870a28a559f4e81dcfc54cbd128cb7 (diff)
downloadspark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.gz
spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.tar.bz2
spark-93cdb8a7d0f124b4db069fd8242207c82e263c52.zip
[SPARK-8425][CORE] Application Level Blacklisting
## What changes were proposed in this pull request? This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira. ## How was this patch tested? Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness. The added tests include: - verifying BlacklistTracker works correctly - verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker) - an integration test for the entire scheduler with blacklisting in a few different scenarios Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes #14079 from squito/blacklist-SPARK-8425.
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala272
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala352
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala109
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala49
-rw-r--r--docs/configuration.md30
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala6
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala21
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala12
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala33
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala57
18 files changed, 1007 insertions, 76 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index fb62682b6c..aba429bcdc 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -114,11 +114,21 @@ package object config {
.intConf
.createWithDefault(2)
+ private[spark] val MAX_FAILURES_PER_EXEC =
+ ConfigBuilder("spark.blacklist.application.maxFailedTasksPerExecutor")
+ .intConf
+ .createWithDefault(2)
+
private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)
+ private[spark] val MAX_FAILED_EXEC_PER_NODE =
+ ConfigBuilder("spark.blacklist.application.maxFailedExecutorsPerNode")
+ .intConf
+ .createWithDefault(2)
+
private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
.intConf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index fca4c6d37e..bf7a62ea33 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -17,10 +17,274 @@
package org.apache.spark.scheduler
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+/**
+ * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting
+ * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add
+ * additional blacklisting of executors and nodes for individual tasks and stages which works in
+ * concert with the blacklisting here.
+ *
+ * The tracker needs to deal with a variety of workloads, eg.:
+ *
+ * * bad user code -- this may lead to many task failures, but that should not count against
+ * individual executors
+ * * many small stages -- this may prevent a bad executor for having many failures within one
+ * stage, but still many failures over the entire application
+ * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit
+ * blacklisting
+ *
+ * See the design doc on SPARK-8425 for a more in-depth discussion.
+ *
+ * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is
+ * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The
+ * one exception is [[nodeBlacklist()]], which can be called without holding a lock.
+ */
+private[scheduler] class BlacklistTracker (
+ conf: SparkConf,
+ clock: Clock = new SystemClock()) extends Logging {
+
+ BlacklistTracker.validateBlacklistConfs(conf)
+ private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
+ private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
+ val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
+
+ /**
+ * A map from executorId to information on task failures. Tracks the time of each task failure,
+ * so that we can avoid blacklisting executors due to failures that are very far apart. We do not
+ * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take
+ * to do so. But it will not grow too large, because as soon as an executor gets too many
+ * failures, we blacklist the executor and remove its entry here.
+ */
+ private val executorIdToFailureList = new HashMap[String, ExecutorFailureList]()
+ val executorIdToBlacklistStatus = new HashMap[String, BlacklistedExecutor]()
+ val nodeIdToBlacklistExpiryTime = new HashMap[String, Long]()
+ /**
+ * An immutable copy of the set of nodes that are currently blacklisted. Kept in an
+ * AtomicReference to make [[nodeBlacklist()]] thread-safe.
+ */
+ private val _nodeBlacklist = new AtomicReference[Set[String]](Set())
+ /**
+ * Time when the next blacklist will expire. Used as a
+ * shortcut to avoid iterating over all entries in the blacklist when none will have expired.
+ */
+ var nextExpiryTime: Long = Long.MaxValue
+ /**
+ * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not*
+ * remove from this when executors are removed from spark, so we can track when we get multiple
+ * successive blacklisted executors on one node. Nonetheless, it will not grow too large because
+ * there cannot be many blacklisted executors on one node, before we stop requesting more
+ * executors on that node, and we clean up the list of blacklisted executors once an executor has
+ * been blacklisted for BLACKLIST_TIMEOUT_MILLIS.
+ */
+ val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]()
+
+ /**
+ * Un-blacklists executors and nodes that have been blacklisted for at least
+ * BLACKLIST_TIMEOUT_MILLIS
+ */
+ def applyBlacklistTimeout(): Unit = {
+ val now = clock.getTimeMillis()
+ // quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work
+ if (now > nextExpiryTime) {
+ // Apply the timeout to blacklisted nodes and executors
+ val execsToUnblacklist = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys
+ if (execsToUnblacklist.nonEmpty) {
+ // Un-blacklist any executors that have been blacklisted longer than the blacklist timeout.
+ logInfo(s"Removing executors $execsToUnblacklist from blacklist because the blacklist " +
+ s"for those executors has timed out")
+ execsToUnblacklist.foreach { exec =>
+ val status = executorIdToBlacklistStatus.remove(exec).get
+ val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
+ failedExecsOnNode.remove(exec)
+ if (failedExecsOnNode.isEmpty) {
+ nodeToBlacklistedExecs.remove(status.node)
+ }
+ }
+ }
+ val nodesToUnblacklist = nodeIdToBlacklistExpiryTime.filter(_._2 < now).keys
+ if (nodesToUnblacklist.nonEmpty) {
+ // Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
+ logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
+ s"has timed out")
+ nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
+ _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+ }
+ updateNextExpiryTime()
+ }
+ }
+
+ private def updateNextExpiryTime(): Unit = {
+ val execMinExpiry = if (executorIdToBlacklistStatus.nonEmpty) {
+ executorIdToBlacklistStatus.map{_._2.expiryTime}.min
+ } else {
+ Long.MaxValue
+ }
+ val nodeMinExpiry = if (nodeIdToBlacklistExpiryTime.nonEmpty) {
+ nodeIdToBlacklistExpiryTime.values.min
+ } else {
+ Long.MaxValue
+ }
+ nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
+ }
+
+
+ def updateBlacklistForSuccessfulTaskSet(
+ stageId: Int,
+ stageAttemptId: Int,
+ failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
+ // if any tasks failed, we count them towards the overall failure count for the executor at
+ // this point.
+ val now = clock.getTimeMillis()
+ failuresByExec.foreach { case (exec, failuresInTaskSet) =>
+ val appFailuresOnExecutor =
+ executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
+ appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
+ appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
+ val newTotal = appFailuresOnExecutor.numUniqueTaskFailures
+
+ val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
+ // If this pushes the total number of failures over the threshold, blacklist the executor.
+ // If its already blacklisted, we avoid "re-blacklisting" (which can happen if there were
+ // other tasks already running in another taskset when it got blacklisted), because it makes
+ // some of the logic around expiry times a little more confusing. But it also wouldn't be a
+ // problem to re-blacklist, with a later expiry time.
+ if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
+ logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
+ s" task failures in successful task sets")
+ val node = failuresInTaskSet.node
+ executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
+ updateNextExpiryTime()
+
+ // In addition to blacklisting the executor, we also update the data for failures on the
+ // node, and potentially put the entire node into a blacklist as well.
+ val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
+ blacklistedExecsOnNode += exec
+ // If the node is already in the blacklist, we avoid adding it again with a later expiry
+ // time.
+ if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
+ !nodeIdToBlacklistExpiryTime.contains(node)) {
+ logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
+ s"executors blacklisted: ${blacklistedExecsOnNode}")
+ nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
+ _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+ }
+ }
+ }
+ }
+
+ def isExecutorBlacklisted(executorId: String): Boolean = {
+ executorIdToBlacklistStatus.contains(executorId)
+ }
+
+ /**
+ * Get the full set of nodes that are blacklisted. Unlike other methods in this class, this *IS*
+ * thread-safe -- no lock required on a taskScheduler.
+ */
+ def nodeBlacklist(): Set[String] = {
+ _nodeBlacklist.get()
+ }
+
+ def isNodeBlacklisted(node: String): Boolean = {
+ nodeIdToBlacklistExpiryTime.contains(node)
+ }
+
+ def handleRemovedExecutor(executorId: String): Unit = {
+ // We intentionally do not clean up executors that are already blacklisted in
+ // nodeToBlacklistedExecs, so that if another executor on the same node gets blacklisted, we can
+ // blacklist the entire node. We also can't clean up executorIdToBlacklistStatus, so we can
+ // eventually remove the executor after the timeout. Despite not clearing those structures
+ // here, we don't expect they will grow too big since you won't get too many executors on one
+ // node, and the timeout will clear it up periodically in any case.
+ executorIdToFailureList -= executorId
+ }
+
+
+ /**
+ * Tracks all failures for one executor (that have not passed the timeout).
+ *
+ * In general we actually expect this to be extremely small, since it won't contain more than the
+ * maximum number of task failures before an executor is failed (default 2).
+ */
+ private[scheduler] final class ExecutorFailureList extends Logging {
+
+ private case class TaskId(stage: Int, stageAttempt: Int, taskIndex: Int)
+
+ /**
+ * All failures on this executor in successful task sets.
+ */
+ private var failuresAndExpiryTimes = ArrayBuffer[(TaskId, Long)]()
+ /**
+ * As an optimization, we track the min expiry time over all entries in failuresAndExpiryTimes
+ * so its quick to tell if there are any failures with expiry before the current time.
+ */
+ private var minExpiryTime = Long.MaxValue
+
+ def addFailures(
+ stage: Int,
+ stageAttempt: Int,
+ failuresInTaskSet: ExecutorFailuresInTaskSet): Unit = {
+ failuresInTaskSet.taskToFailureCountAndFailureTime.foreach {
+ case (taskIdx, (_, failureTime)) =>
+ val expiryTime = failureTime + BLACKLIST_TIMEOUT_MILLIS
+ failuresAndExpiryTimes += ((TaskId(stage, stageAttempt, taskIdx), expiryTime))
+ if (expiryTime < minExpiryTime) {
+ minExpiryTime = expiryTime
+ }
+ }
+ }
+
+ /**
+ * The number of unique tasks that failed on this executor. Only counts failures within the
+ * timeout, and in successful tasksets.
+ */
+ def numUniqueTaskFailures: Int = failuresAndExpiryTimes.size
+
+ def isEmpty: Boolean = failuresAndExpiryTimes.isEmpty
+
+ /**
+ * Apply the timeout to individual tasks. This is to prevent one-off failures that are very
+ * spread out in time (and likely have nothing to do with problems on the executor) from
+ * triggering blacklisting. However, note that we do *not* remove executors and nodes from
+ * the blacklist as we expire individual task failures -- each have their own timeout. Eg.,
+ * suppose:
+ * * timeout = 10, maxFailuresPerExec = 2
+ * * Task 1 fails on exec 1 at time 0
+ * * Task 2 fails on exec 1 at time 5
+ * --> exec 1 is blacklisted from time 5 - 15.
+ * This is to simplify the implementation, as well as keep the behavior easier to understand
+ * for the end user.
+ */
+ def dropFailuresWithTimeoutBefore(dropBefore: Long): Unit = {
+ if (minExpiryTime < dropBefore) {
+ var newMinExpiry = Long.MaxValue
+ val newFailures = new ArrayBuffer[(TaskId, Long)]
+ failuresAndExpiryTimes.foreach { case (task, expiryTime) =>
+ if (expiryTime >= dropBefore) {
+ newFailures += ((task, expiryTime))
+ if (expiryTime < newMinExpiry) {
+ newMinExpiry = expiryTime
+ }
+ }
+ }
+ failuresAndExpiryTimes = newFailures
+ minExpiryTime = newMinExpiry
+ }
+ }
+
+ override def toString(): String = {
+ s"failures = $failuresAndExpiryTimes"
+ }
+ }
+
+}
private[scheduler] object BlacklistTracker extends Logging {
@@ -80,7 +344,9 @@ private[scheduler] object BlacklistTracker extends Logging {
config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
config.MAX_TASK_ATTEMPTS_PER_NODE,
config.MAX_FAILURES_PER_EXEC_STAGE,
- config.MAX_FAILED_EXEC_PER_NODE_STAGE
+ config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+ config.MAX_FAILURES_PER_EXEC,
+ config.MAX_FAILED_EXEC_PER_NODE
).foreach { config =>
val v = conf.get(config)
if (v <= 0) {
@@ -112,3 +378,5 @@ private[scheduler] object BlacklistTracker extends Logging {
}
}
}
+
+private final case class BlacklistedExecutor(node: String, expiryTime: Long)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
index 20ab27d127..70553d8be2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
@@ -25,26 +25,30 @@ import scala.collection.mutable.HashMap
private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
/**
* Mapping from index of the tasks in the taskset, to the number of times it has failed on this
- * executor.
+ * executor and the most recent failure time.
*/
- val taskToFailureCount = HashMap[Int, Int]()
+ val taskToFailureCountAndFailureTime = HashMap[Int, (Int, Long)]()
- def updateWithFailure(taskIndex: Int): Unit = {
- val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
- taskToFailureCount(taskIndex) = prevFailureCount + 1
+ def updateWithFailure(taskIndex: Int, failureTime: Long): Unit = {
+ val (prevFailureCount, prevFailureTime) =
+ taskToFailureCountAndFailureTime.getOrElse(taskIndex, (0, -1L))
+ // these times always come from the driver, so we don't need to worry about skew, but might
+ // as well still be defensive in case there is non-monotonicity in the clock
+ val newFailureTime = math.max(prevFailureTime, failureTime)
+ taskToFailureCountAndFailureTime(taskIndex) = (prevFailureCount + 1, newFailureTime)
}
- def numUniqueTasksWithFailures: Int = taskToFailureCount.size
+ def numUniqueTasksWithFailures: Int = taskToFailureCountAndFailureTime.size
/**
* Return the number of times this executor has failed on the given task index.
*/
def getNumTaskFailures(index: Int): Int = {
- taskToFailureCount.getOrElse(index, 0)
+ taskToFailureCountAndFailureTime.getOrElse(index, (0, 0))._1
}
override def toString(): String = {
s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
- s"tasksToFailureCount = $taskToFailureCount"
+ s"tasksToFailureCount = $taskToFailureCountAndFailureTime"
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index b03cfe4f0d..9a8e313f9e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,13 +51,28 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
-private[spark] class TaskSchedulerImpl(
+private[spark] class TaskSchedulerImpl private[scheduler](
val sc: SparkContext,
val maxTaskFailures: Int,
+ blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
- def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
+
+ def this(sc: SparkContext) = {
+ this(
+ sc,
+ sc.conf.get(config.MAX_TASK_FAILURES),
+ TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf))
+ }
+
+ def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
+ this(
+ sc,
+ maxTaskFailures,
+ TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf),
+ isLocal = isLocal)
+ }
val conf = sc.conf
@@ -209,7 +224,7 @@ private[spark] class TaskSchedulerImpl(
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
- new TaskSetManager(this, taskSet, maxTaskFailures)
+ new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -256,6 +271,8 @@ private[spark] class TaskSchedulerImpl(
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
+ // nodes and executors that are blacklisted for the entire application have already been
+ // filtered out by this point
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
@@ -308,8 +325,20 @@ private[spark] class TaskSchedulerImpl(
}
}
+ // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
+ // this here to avoid a separate thread and added synchronization overhead, and also because
+ // updating the blacklist is only relevant when task offers are being made.
+ blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
+
+ val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
+ offers.filter { offer =>
+ !blacklistTracker.isNodeBlacklisted(offer.host) &&
+ !blacklistTracker.isExecutorBlacklisted(offer.executorId)
+ }
+ }.getOrElse(offers)
+
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
- val shuffledOffers = Random.shuffle(offers)
+ val shuffledOffers = Random.shuffle(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
@@ -574,6 +603,7 @@ private[spark] class TaskSchedulerImpl(
executorIdToHost -= executorId
rootPool.executorLost(executorId, host, reason)
}
+ blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
def executorAdded(execId: String, host: String) {
@@ -600,6 +630,14 @@ private[spark] class TaskSchedulerImpl(
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}
+ /**
+ * Get a snapshot of the currently blacklisted nodes for the entire application. This is
+ * thread-safe -- it can be called without a lock on the TaskScheduler.
+ */
+ def nodeBlacklist(): scala.collection.immutable.Set[String] = {
+ blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set())
+ }
+
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
@@ -678,4 +716,13 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
+
+ private def maybeCreateBlacklistTracker(conf: SparkConf): Option[BlacklistTracker] = {
+ if (BlacklistTracker.isBlacklistEnabled(conf)) {
+ Some(new BlacklistTracker(conf))
+ } else {
+ None
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
index f4b0f55b76..e815b7e0cf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
@@ -28,6 +28,10 @@ import org.apache.spark.util.Clock
* (task, executor) / (task, nodes) pairs, and also completely blacklisting executors and nodes
* for the entire taskset.
*
+ * It also must store sufficient information in task failures for application level blacklisting,
+ * which is handled by [[BlacklistTracker]]. Note that BlacklistTracker does not know anything
+ * about task failures until a taskset completes successfully.
+ *
* THREADING: This class is a helper to [[TaskSetManager]]; as with the methods in
* [[TaskSetManager]] this class is designed only to be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
@@ -41,7 +45,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)
/**
- * A map from each executor to the task failures on that executor.
+ * A map from each executor to the task failures on that executor. This is used for blacklisting
+ * within this taskset, and it is also relayed onto [[BlacklistTracker]] for app-level
+ * blacklisting if this taskset completes successfully.
*/
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
@@ -57,9 +63,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
/**
* Return true if this executor is blacklisted for the given task. This does *not*
- * need to return true if the executor is blacklisted for the entire stage.
- * That is to keep this method as fast as possible in the inner-loop of the
- * scheduler, where those filters will have already been applied.
+ * need to return true if the executor is blacklisted for the entire stage, or blacklisted
+ * for the entire application. That is to keep this method as fast as possible in the inner-loop
+ * of the scheduler, where those filters will have already been applied.
*/
def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
execToFailures.get(executorId).exists { execFailures =>
@@ -72,10 +78,10 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
}
/**
- * Return true if this executor is blacklisted for the given stage. Completely ignores
- * anything to do with the node the executor is on. That
- * is to keep this method as fast as possible in the inner-loop of the scheduler, where those
- * filters will already have been applied.
+ * Return true if this executor is blacklisted for the given stage. Completely ignores whether
+ * the executor is blacklisted for the entire application (or anything to do with the node the
+ * executor is on). That is to keep this method as fast as possible in the inner-loop of the
+ * scheduler, where those filters will already have been applied.
*/
def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
blacklistedExecs.contains(executorId)
@@ -90,7 +96,7 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
exec: String,
index: Int): Unit = {
val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
- execFailures.updateWithFailure(index)
+ execFailures.updateWithFailure(index, clock.getTimeMillis())
// check if this task has also failed on other executors on the same host -- if its gone
// over the limit, blacklist this task from the entire host.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index f2a432cad3..3756c216f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -51,6 +51,7 @@ private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
+ blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
private val conf = sched.sc.conf
@@ -85,10 +86,8 @@ private[spark] class TaskSetManager(
var calculatedTasks = 0
private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
- if (BlacklistTracker.isBlacklistEnabled(conf)) {
- Some(new TaskSetBlacklist(conf, stageId, clock))
- } else {
- None
+ blacklistTracker.map { _ =>
+ new TaskSetBlacklist(conf, stageId, clock)
}
}
@@ -487,6 +486,12 @@ private[spark] class TaskSetManager(
private def maybeFinishTaskSet() {
if (isZombie && runningTasks == 0) {
sched.taskSetFinished(this)
+ if (tasksSuccessful == numTasks) {
+ blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(
+ taskSet.stageId,
+ taskSet.stageAttemptId,
+ taskSetBlacklistHelperOpt.get.execToFailures))
+ }
}
}
@@ -589,6 +594,7 @@ private[spark] class TaskSetManager(
private[scheduler] def abortIfCompletelyBlacklisted(
hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+ val appBlacklist = blacklistTracker.get
// Only look for unschedulable tasks when at least one executor has registered. Otherwise,
// task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
if (hostToExecutors.nonEmpty) {
@@ -615,13 +621,15 @@ private[spark] class TaskSetManager(
val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
// Check if the task can run on the node
val nodeBlacklisted =
- taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
- taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet)
+ appBlacklist.isNodeBlacklisted(host) ||
+ taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
+ taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet)
if (nodeBlacklisted) {
true
} else {
// Check if the task can run on any of the executors
execsOnHost.forall { exec =>
+ appBlacklist.isExecutorBlacklisted(exec) ||
taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) ||
taskSetBlacklist.isExecutorBlacklistedForTask(exec, indexInTaskSet)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 0a4f19d760..0280359809 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -99,7 +99,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RequestExecutors(
requestedTotal: Int,
localityAwareTasks: Int,
- hostToLocalTaskCount: Map[String, Int])
+ hostToLocalTaskCount: Map[String, Int],
+ nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage
// Check if an executor was force-killed but for a reason unrelated to the running tasks.
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 915d7a1b8b..7b6a2313f9 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -272,7 +272,7 @@ private class FakeSchedulerBackend(
protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
- RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
+ RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
}
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
@@ -291,7 +291,7 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin
def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RequestExecutors(requestedTotal, _, _) =>
+ case RequestExecutors(requestedTotal, _, _, _) =>
targetNumExecutors = requestedTotal
context.reply(true)
case KillExecutors(executorIds) =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index b2e7ec5df0..6b314d2ae3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -17,10 +17,356 @@
package org.apache.spark.scheduler
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark._
import org.apache.spark.internal.config
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar
+ with LocalSparkContext {
+
+ private val clock = new ManualClock(0)
+
+ private var blacklist: BlacklistTracker = _
+ private var scheduler: TaskSchedulerImpl = _
+ private var conf: SparkConf = _
+
+ override def beforeEach(): Unit = {
+ conf = new SparkConf().setAppName("test").setMaster("local")
+ .set(config.BLACKLIST_ENABLED.key, "true")
+ scheduler = mockTaskSchedWithConf(conf)
+
+ clock.setTime(0)
+ blacklist = new BlacklistTracker(conf, clock)
+ }
+
+ override def afterEach(): Unit = {
+ if (blacklist != null) {
+ blacklist = null
+ }
+ if (scheduler != null) {
+ scheduler.stop()
+ scheduler = null
+ }
+ super.afterEach()
+ }
+
+ // All executors and hosts used in tests should be in this set, so that [[assertEquivalentToSet]]
+ // works. Its OK if its got extraneous entries
+ val allExecutorAndHostIds = {
+ (('A' to 'Z')++ (1 to 100).map(_.toString))
+ .flatMap{ suffix =>
+ Seq(s"host$suffix", s"host-$suffix")
+ }
+ }.toSet
+
+ /**
+ * Its easier to write our tests as if we could directly look at the sets of nodes & executors in
+ * the blacklist. However the api doesn't expose a set, so this is a simple way to test
+ * something similar, since we know the universe of values that might appear in these sets.
+ */
+ def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = {
+ allExecutorAndHostIds.foreach { id =>
+ val actual = f(id)
+ val exp = expected.contains(id)
+ assert(actual === exp, raw"""for string "$id" """)
+ }
+ }
-class BlacklistTrackerSuite extends SparkFunSuite {
+ def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = {
+ sc = new SparkContext(conf)
+ val scheduler = mock[TaskSchedulerImpl]
+ when(scheduler.sc).thenReturn(sc)
+ when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker)
+ scheduler
+ }
+
+ def createTaskSetBlacklist(stageId: Int = 0): TaskSetBlacklist = {
+ new TaskSetBlacklist(conf, stageId, clock)
+ }
+
+ test("executors can be blacklisted with only a few failures per stage") {
+ // For many different stages, executor 1 fails a task, then executor 2 succeeds the task,
+ // and then the task set is done. Not enough failures to blacklist the executor *within*
+ // any particular taskset, but we still blacklist the executor overall eventually.
+ // Also, we intentionally have a mix of task successes and failures -- there are even some
+ // successes after the executor is blacklisted. The idea here is those tasks get scheduled
+ // before the executor is blacklisted. We might get successes after blacklisting (because the
+ // executor might be flaky but not totally broken). But successes should not unblacklist the
+ // executor.
+ val failuresUntilBlacklisted = conf.get(config.MAX_FAILURES_PER_EXEC)
+ var failuresSoFar = 0
+ (0 until failuresUntilBlacklisted * 10).foreach { stageId =>
+ val taskSetBlacklist = createTaskSetBlacklist(stageId)
+ if (stageId % 2 == 0) {
+ // fail one task in every other taskset
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ failuresSoFar += 1
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
+ assert(failuresSoFar == stageId / 2 + 1)
+ if (failuresSoFar < failuresUntilBlacklisted) {
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ } else {
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ }
+ }
+ }
+
+ // If an executor has many task failures, but the task set ends up failing, it shouldn't be
+ // counted against the executor.
+ test("executors aren't blacklisted as a result of tasks in failed task sets") {
+ val failuresUntilBlacklisted = conf.get(config.MAX_FAILURES_PER_EXEC)
+ // for many different stages, executor 1 fails a task, and then the taskSet fails.
+ (0 until failuresUntilBlacklisted * 10).foreach { stage =>
+ val taskSetBlacklist = createTaskSetBlacklist(stage)
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ }
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+
+ Seq(true, false).foreach { succeedTaskSet =>
+ val label = if (succeedTaskSet) "success" else "failure"
+ test(s"stage blacklist updates correctly on stage $label") {
+ // Within one taskset, an executor fails a few times, so it's blacklisted for the taskset.
+ // But if the taskset fails, we shouldn't blacklist the executor after the stage.
+ val taskSetBlacklist = createTaskSetBlacklist(0)
+ // We trigger enough failures for both the taskset blacklist, and the application blacklist.
+ val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
+ conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
+ (0 until numFailures).foreach { index =>
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
+ }
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ if (succeedTaskSet) {
+ // The task set succeeded elsewhere, so we should count those failures against our executor,
+ // and it should be blacklisted for the entire application.
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ } else {
+ // The task set failed, so we don't count these failures against the executor for other
+ // stages.
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+ }
+ }
+
+ test("blacklisted executors and nodes get recovered with time") {
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
+ // application.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+ assert(blacklist.nodeBlacklist() === Set())
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
+ // application. Since that's the second executor that is blacklisted on the same node, we also
+ // blacklist that node.
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+ }
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
+ assert(blacklist.nodeBlacklist() === Set("hostA"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
+
+ // Advance the clock and then make sure hostA and executors 1 and 2 have been removed from the
+ // blacklist.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ blacklist.applyBlacklistTimeout()
+ assert(blacklist.nodeBlacklist() === Set())
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+
+ // Fail one more task, but executor isn't put back into blacklist since the count of failures
+ // on that executor should have been reset to 0.
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
+ assert(blacklist.nodeBlacklist() === Set())
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+
+ test("blacklist can handle lost executors") {
+ // The blacklist should still work if an executor is killed completely. We should still
+ // be able to blacklist the entire node.
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ // Lets say that executor 1 dies completely. We get some task failures, but
+ // the taskset then finishes successfully (elsewhere).
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+ }
+ blacklist.handleRemovedExecutor("1")
+ blacklist.updateBlacklistForSuccessfulTaskSet(
+ stageId = 0,
+ stageAttemptId = 0,
+ taskSetBlacklist0.execToFailures)
+ assert(blacklist.isExecutorBlacklisted("1"))
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2)
+
+ // Now another executor gets spun up on that host, but it also dies.
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ (0 until 4).foreach { partition =>
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+ }
+ blacklist.handleRemovedExecutor("2")
+ blacklist.updateBlacklistForSuccessfulTaskSet(
+ stageId = 1,
+ stageAttemptId = 0,
+ taskSetBlacklist1.execToFailures)
+ // We've now had two bad executors on the hostA, so we should blacklist the entire node.
+ assert(blacklist.isExecutorBlacklisted("1"))
+ assert(blacklist.isExecutorBlacklisted("2"))
+ assert(blacklist.isNodeBlacklisted("hostA"))
+
+ // Advance the clock so that executor 1 should no longer be explicitly blacklisted, but
+ // everything else should still be blacklisted.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2 + 1)
+ blacklist.applyBlacklistTimeout()
+ assert(!blacklist.isExecutorBlacklisted("1"))
+ assert(blacklist.isExecutorBlacklisted("2"))
+ assert(blacklist.isNodeBlacklisted("hostA"))
+ // make sure we don't leak memory
+ assert(!blacklist.executorIdToBlacklistStatus.contains("1"))
+ assert(!blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
+ // Advance the timeout again so now hostA should be removed from the blacklist.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2)
+ blacklist.applyBlacklistTimeout()
+ assert(!blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
+ }
+
+ test("task failures expire with time") {
+ // Verifies that 2 failures within the timeout period cause an executor to be blacklisted, but
+ // if task failures are spaced out by more than the timeout period, the first failure is timed
+ // out, and the executor isn't blacklisted.
+ var stageId = 0
+ def failOneTaskInTaskSet(exec: String): Unit = {
+ val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
+ taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
+ stageId += 1
+ }
+ failOneTaskInTaskSet(exec = "1")
+ // We have one sporadic failure on exec 2, but that's it. Later checks ensure that we never
+ // blacklist executor 2 despite this one failure.
+ failOneTaskInTaskSet(exec = "2")
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ assert(blacklist.nextExpiryTime === Long.MaxValue)
+
+ // We advance the clock past the expiry time.
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ val t0 = clock.getTimeMillis()
+ blacklist.applyBlacklistTimeout()
+ assert(blacklist.nextExpiryTime === Long.MaxValue)
+ failOneTaskInTaskSet(exec = "1")
+
+ // Because the 2nd failure on executor 1 happened past the expiry time, nothing should have been
+ // blacklisted.
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+
+ // Now we add one more failure, within the timeout, and it should be counted.
+ clock.setTime(t0 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+ val t1 = clock.getTimeMillis()
+ failOneTaskInTaskSet(exec = "1")
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ assert(blacklist.nextExpiryTime === t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ // Add failures on executor 3, make sure it gets put on the blacklist.
+ clock.setTime(t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+ val t2 = clock.getTimeMillis()
+ failOneTaskInTaskSet(exec = "3")
+ failOneTaskInTaskSet(exec = "3")
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "3"))
+ assert(blacklist.nextExpiryTime === t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ // Now we go past the timeout for executor 1, so it should be dropped from the blacklist.
+ clock.setTime(t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("3"))
+ assert(blacklist.nextExpiryTime === t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ // Make sure that we update correctly when we go from having blacklisted executors to
+ // just having tasks with timeouts.
+ clock.setTime(t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+ failOneTaskInTaskSet(exec = "4")
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("3"))
+ assert(blacklist.nextExpiryTime === t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+ clock.setTime(t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+ blacklist.applyBlacklistTimeout()
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ // we've got one task failure still, but we don't bother setting nextExpiryTime to it, to
+ // avoid wasting time checking for expiry of individual task failures.
+ assert(blacklist.nextExpiryTime === Long.MaxValue)
+ }
+
+ test("task failure timeout works as expected for long-running tasksets") {
+ // This ensures that we don't trigger spurious blacklisting for long tasksets, when the taskset
+ // finishes long after the task failures. We create two tasksets, each with one failure.
+ // Individually they shouldn't cause any blacklisting since there is only one failure.
+ // Furthermore, we space the failures out so far that even when both tasksets have completed,
+ // we still don't trigger any blacklisting.
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
+ // Taskset1 has one failure immediately
+ taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
+ // Then we have a *long* delay, much longer than the timeout, before any other failures or
+ // taskset completion
+ clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
+ // After the long delay, we have one failure on taskset 2, on the same executor
+ taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
+ // Finally, we complete both tasksets. Its important here to complete taskset2 *first*. We
+ // want to make sure that when taskset 1 finishes, even though we've now got two task failures,
+ // we realize that the task failure we just added was well before the timeout.
+ clock.advance(1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId = 2, 0, taskSetBlacklist2.execToFailures)
+ clock.advance(1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(stageId = 1, 0, taskSetBlacklist1.execToFailures)
+
+ // Make sure nothing was blacklisted
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+ }
+
+ test("only blacklist nodes for the application when enough executors have failed on that " +
+ "specific host") {
+ // we blacklist executors on two different hosts -- make sure that doesn't lead to any
+ // node blacklisting
+ val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+
+ val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
+ taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+
+ // Finally, blacklist another executor on the same node as the original blacklisted executor,
+ // and make sure this time we *do* blacklist the node.
+ val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
+ taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
+ blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
+ assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
+ assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
+ }
test("blacklist still respects legacy configs") {
val conf = new SparkConf().setMaster("local")
@@ -68,6 +414,8 @@ class BlacklistTrackerSuite extends SparkFunSuite {
config.MAX_TASK_ATTEMPTS_PER_NODE,
config.MAX_FAILURES_PER_EXEC_STAGE,
config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+ config.MAX_FAILURES_PER_EXEC,
+ config.MAX_FAILED_EXEC_PER_NODE,
config.BLACKLIST_TIMEOUT_CONF
).foreach { config =>
conf.set(config.key, "0")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a0b6268331..304dc9d47e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -21,14 +21,15 @@ import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
-import org.mockito.Matchers.{anyInt, anyString, eq => meq}
-import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
+import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
+import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
+import org.apache.spark.storage.BlockManagerId
class FakeSchedulerBackend extends SchedulerBackend {
def start() {}
@@ -44,6 +45,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
var failedTaskSetReason: String = null
var failedTaskSet = false
+ var blacklist: BlacklistTracker = null
var taskScheduler: TaskSchedulerImpl = null
var dagScheduler: DAGScheduler = null
@@ -82,11 +84,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+ blacklist = mock[BlacklistTracker]
val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
conf.set(config.BLACKLIST_ENABLED, true)
sc = new SparkContext(conf)
taskScheduler =
- new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+ new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) {
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm just so we can set the TaskSetBlacklist
@@ -408,6 +411,95 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
assert(tsm.isZombie)
}
+
+ // the tasksSets complete, so the tracker should be notified of the successful ones
+ verify(blacklist, times(1)).updateBlacklistForSuccessfulTaskSet(
+ stageId = 0,
+ stageAttemptId = 0,
+ failuresByExec = stageToMockTaskSetBlacklist(0).execToFailures)
+ verify(blacklist, times(1)).updateBlacklistForSuccessfulTaskSet(
+ stageId = 1,
+ stageAttemptId = 0,
+ failuresByExec = stageToMockTaskSetBlacklist(1).execToFailures)
+ // but we shouldn't update for the failed taskset
+ verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(
+ stageId = meq(2),
+ stageAttemptId = anyInt(),
+ failuresByExec = anyObject())
+ }
+
+ test("scheduled tasks obey node and executor blacklists") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ (0 to 2).foreach { stageId =>
+ val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ }
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 10),
+ new WorkerOffer("executor4", "host3", 1)
+ )
+
+ // setup our mock blacklist:
+ // host1, executor0 & executor3 are completely blacklisted
+ // This covers everything *except* one core on executor4 / host3, so that everything is still
+ // schedulable.
+ when(blacklist.isNodeBlacklisted("host1")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor0")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor3")).thenReturn(true)
+
+ val stageToTsm = (0 to 2).map { stageId =>
+ val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+ stageId -> tsm
+ }.toMap
+
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ firstTaskAttempts.foreach { task => logInfo(s"scheduled $task on ${task.executorId}") }
+ assert(firstTaskAttempts.size === 1)
+ assert(firstTaskAttempts.head.executorId === "executor4")
+ ('0' until '2').foreach { hostNum =>
+ verify(blacklist, atLeast(1)).isNodeBlacklisted("host" + hostNum)
+ }
+ }
+
+ test("abort stage when all executors are blacklisted") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // first just submit some offers so the scheduler knows about all the executors
+ taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 2),
+ WorkerOffer("executor1", "host0", 2),
+ WorkerOffer("executor2", "host0", 2),
+ WorkerOffer("executor3", "host1", 2)
+ ))
+
+ // now say our blacklist updates to blacklist a bunch of resources, but *not* everything
+ when(blacklist.isNodeBlacklisted("host1")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor0")).thenReturn(true)
+
+ // make an offer on the blacklisted resources. We won't schedule anything, but also won't
+ // abort yet, since we know of other resources that work
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 2),
+ WorkerOffer("executor3", "host1", 2)
+ )).flatten.size === 0)
+ assert(!tsm.isZombie)
+
+ // now update the blacklist so that everything really is blacklisted
+ when(blacklist.isExecutorBlacklisted("executor1")).thenReturn(true)
+ when(blacklist.isExecutorBlacklisted("executor2")).thenReturn(true)
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 2),
+ WorkerOffer("executor3", "host1", 2)
+ )).flatten.size === 0)
+ assert(tsm.isZombie)
+ verify(tsm).abort(anyString(), anyObject())
}
/**
@@ -650,6 +742,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}
+ test("scheduler checks for executors that can be expired from blacklist") {
+ taskScheduler = setupScheduler()
+
+ taskScheduler.submitTasks(FakeTask.createTaskSet(1, 0))
+ taskScheduler.resourceOffers(IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ verify(blacklist).applyBlacklistTimeout()
+ }
+
test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
index 8c902af568..6b52c10b2c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -85,9 +85,9 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
Seq("exec1", "exec2").foreach { exec =>
assert(
- execToFailures(exec).taskToFailureCount === Map(
- 0 -> 1,
- 1 -> 1
+ execToFailures(exec).taskToFailureCountAndFailureTime === Map(
+ 0 -> (1, 0),
+ 1 -> (1, 0)
)
)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index abc8fff30e..2f5b029a96 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -183,7 +183,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdates = taskSet.tasks.head.metrics.internalAccums
// Offer a host with NO_PREF as the constraint,
@@ -236,7 +236,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// An executor that is not NODE_LOCAL should be rejected.
assert(manager.resourceOffer("execC", "host2", ANY) === None)
@@ -257,7 +257,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq() // Last task has no locality prefs
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
@@ -286,7 +286,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq() // Last task has no locality prefs
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
@@ -306,7 +306,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host2"))
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -344,7 +344,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host3"))
)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -376,7 +376,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -393,7 +393,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
@@ -426,7 +426,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, 4, clock)
+ // We don't directly use the application blacklist, but its presence triggers blacklisting
+ // within the taskset.
+ val blacklistTrackerOpt = Some(new BlacklistTracker(conf, clock))
+ val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
{
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
@@ -515,7 +518,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host2", "execC")),
Seq())
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
// Add a new executor
@@ -546,7 +549,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+ val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
sched.addExecutor("execA", "host1")
manager.executorAdded()
sched.addExecutor("execC", "host2")
@@ -579,7 +582,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execA")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
// Set allowed locality to ANY
@@ -670,7 +673,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(),
Seq(TaskLocation("host3", "execC")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
@@ -698,7 +701,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(),
Seq(TaskLocation("host3")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
@@ -720,7 +723,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
@@ -740,7 +743,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
@@ -760,7 +763,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
// Add a new executor
@@ -794,7 +797,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host2")),
Seq(TaskLocation("hdfs_cache_host3")))
val clock = new ManualClock
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execA")
manager.executorAdded()
@@ -822,7 +825,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
val clock = new ManualClock()
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
@@ -876,7 +879,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation.quantile", "0.6")
val clock = new ManualClock()
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
@@ -980,17 +983,17 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
- val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock)
assert(manager.name === "TaskSet_0.0")
// Make sure a task set with the same stage ID but different attempt ID has a unique name
val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
- val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
+ val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock)
assert(manager2.name === "TaskSet_0.1")
// Make sure a task set with the same attempt ID but different stage ID also has a unique name
val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
- val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
+ val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, clock = new ManualClock)
assert(manager3.name === "TaskSet_1.1")
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 7e466d7dc1..07bcd4aa7f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1316,6 +1316,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.blacklist.timeout</code></td>
+ <td>1h</td>
+ <td>
+ (Experimental) How long a node or executor is blacklisted for the entire application, before it
+ is unconditionally removed from the blacklist to attempt running new tasks.
+ </td>
+</tr>
+<tr>
<td><code>spark.blacklist.task.maxTaskAttemptsPerExecutor</code></td>
<td>1</td>
<td>
@@ -1348,6 +1356,28 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.blacklist.application.maxFailedTasksPerExecutor</code></td>
+ <td>2</td>
+ <td>
+ (Experimental) How many different tasks must fail on one executor, in successful task sets,
+ before the executor is blacklisted for the entire application. Blacklisted executors will
+ be automatically added back to the pool of available resources after the timeout specified by
+ <code>spark.blacklist.timeout</code>. Note that with dynamic allocation, though, the executors
+ may get marked as idle and be reclaimed by the cluster manager.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.blacklist.application.maxFailedExecutorsPerNode</code></td>
+ <td>2</td>
+ <td>
+ (Experimental) How many different executors must be blacklisted for the entire application,
+ before the node is blacklisted for the entire application. Blacklisted nodes will
+ be automatically added back to the pool of available resources after the timeout specified by
+ <code>spark.blacklist.timeout</code>. Note that with dynamic allocation, though, the executors
+ on the node may get marked as idle and be reclaimed by the cluster manager.
+ </td>
+</tr>
+<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
<td>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0378ef4fac..f79c66b9ff 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -692,11 +692,11 @@ private[spark] class ApplicationMaster(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
+ case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
- if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
- localityAwareTasks, hostToLocalTaskCount)) {
+ if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
+ r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 0b66d1cf08..e498932e51 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -114,6 +114,8 @@ private[yarn] class YarnAllocator(
@volatile private var targetNumExecutors =
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
+ private var currentNodeBlacklist = Set.empty[String]
+
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
// was lost.
@@ -217,18 +219,35 @@ private[yarn] class YarnAllocator(
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
+ * @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new
+ * containers on them. It will be used to update the application master's
+ * blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
- hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+ hostToLocalTaskCount: Map[String, Int],
+ nodeBlacklist: Set[String]): Boolean = synchronized {
this.numLocalityAwareTasks = localityAwareTasks
this.hostToLocalTaskCounts = hostToLocalTaskCount
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
+
+ // Update blacklist infomation to YARN ResouceManager for this application,
+ // in order to avoid allocating new Containers on the problematic nodes.
+ val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
+ val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
+ if (blacklistAdditions.nonEmpty) {
+ logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions")
+ }
+ if (blacklistRemovals.nonEmpty) {
+ logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals")
+ }
+ amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava)
+ currentNodeBlacklist = nodeBlacklist
true
} else {
false
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 2f9ea1911f..cbc6e60e83 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -121,13 +121,21 @@ private[spark] abstract class YarnSchedulerBackend(
}
}
+ private[cluster] def prepareRequestExecutors(requestedTotal: Int): RequestExecutors = {
+ val nodeBlacklist: Set[String] = scheduler.nodeBlacklist()
+ // For locality preferences, ignore preferences for nodes that are blacklisted
+ val filteredHostToLocalTaskCount =
+ hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklist.contains(k) }
+ RequestExecutors(requestedTotal, localityAwareTasks, filteredHostToLocalTaskCount,
+ nodeBlacklist)
+ }
+
/**
* Request executors from the ApplicationMaster by specifying the total number desired.
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
- yarnSchedulerEndpointRef.ask[Boolean](
- RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
+ yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
}
/**
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 994dc75d34..331bad4fd8 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.yarn
import java.util.{Arrays, List => JList}
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
@@ -90,7 +92,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
override def equals(other: Any): Boolean = false
}
- def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
+ def createAllocator(
+ maxExecutors: Int = 5,
+ rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = {
val args = Array(
"--jar", "somejar.jar",
"--class", "SomeClass")
@@ -202,7 +206,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
- handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (3)
@@ -213,7 +217,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (1)
}
@@ -224,7 +228,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
- handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (3)
@@ -234,7 +238,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (2)
- handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (0)
handler.getNumExecutorsRunning should be (2)
@@ -250,7 +254,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
- handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
val statuses = Seq(container1, container2).map { c =>
@@ -272,7 +276,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set.empty)
val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
@@ -286,6 +290,21 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumUnexpectedContainerRelease should be (2)
}
+ test("blacklisted nodes reflected in amClient requests") {
+ // Internally we track the set of blacklisted nodes, but yarn wants us to send *changes*
+ // to the blacklist. This makes sure we are sending the right updates.
+ val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+ val handler = createAllocator(4, mockAmClient)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA"))
+ verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq().asJava)
+
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set("hostA", "hostB"))
+ verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq().asJava)
+
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
+ verify(mockAmClient).updateBlacklist(Seq().asJava, Seq("hostA", "hostB").asJava)
+ }
+
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
new file mode 100644
index 0000000000..ffa0b58ee7
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.mockito.Mockito.when
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.serializer.JavaSerializer
+
+class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext {
+
+ test("RequestExecutors reflects node blacklist and is serializable") {
+ sc = new SparkContext("local", "YarnSchedulerBackendSuite")
+ val sched = mock[TaskSchedulerImpl]
+ when(sched.sc).thenReturn(sc)
+ val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) {
+ def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
+ this.hostToLocalTaskCount = hostToLocalTaskCount
+ }
+ }
+ val ser = new JavaSerializer(sc.conf).newInstance()
+ for {
+ blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
+ numRequested <- 0 until 10
+ hostToLocalCount <- IndexedSeq(
+ Map[String, Int](),
+ Map("a" -> 1, "b" -> 2)
+ )
+ } {
+ yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount)
+ when(sched.nodeBlacklist()).thenReturn(blacklist)
+ val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested)
+ assert(req.requestedTotal === numRequested)
+ assert(req.nodeBlacklist === blacklist)
+ assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
+ // Serialize to make sure serialization doesn't throw an error
+ ser.serialize(req)
+ }
+ }
+
+}