aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-10-12 16:43:03 -0500
committerImran Rashid <irashid@cloudera.com>2016-10-12 16:43:03 -0500
commit9ce7d3e542e786c62f047c13f3001e178f76e06a (patch)
tree6d43e48a1d969fb70347b8540b0bb50e4456b6d6
parent47776e7c0c68590fe446cef910900b1aaead06f9 (diff)
downloadspark-9ce7d3e542e786c62f047c13f3001e178f76e06a.tar.gz
spark-9ce7d3e542e786c62f047c13f3001e178f76e06a.tar.bz2
spark-9ce7d3e542e786c62f047c13f3001e178f76e06a.zip
[SPARK-17675][CORE] Expand Blacklist for TaskSets
## What changes were proposed in this pull request? This is a step along the way to SPARK-8425. To enable incremental review, the first step proposed here is to expand the blacklisting within tasksets. In particular, this will enable blacklisting for * (task, executor) pairs (this already exists via an undocumented config) * (task, node) * (taskset, executor) * (taskset, node) Adding (task, node) is critical to making spark fault-tolerant of one-bad disk in a cluster, without requiring careful tuning of "spark.task.maxFailures". The other additions are also important to avoid many misleading task failures and long scheduling delays when there is one bad node on a large cluster. Note that some of the code changes here aren't really required for just this -- they put pieces in place for SPARK-8425 even though they are not used yet (eg. the `BlacklistTracker` helper is a little out of place, `TaskSetBlacklist` holds onto a little more info than it needs to for just this change, and `ExecutorFailuresInTaskSet` is more complex than it needs to be). ## How was this patch tested? Added unit tests, run tests via jenkins. Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes #15249 from squito/taskset_blacklist_only.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala128
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala276
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala81
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala163
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala131
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala4
-rw-r--r--docs/configuration.md43
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala3
17 files changed, 964 insertions, 198 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 51a699f41d..c9c342df82 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
- DeprecatedConfig("spark.rpc", "2.0", "Not used any more.")
+ DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
+ DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
+ "Please use the new blacklisting options, spark.blacklist.*")
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 42690844f9..7ca3c103db 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -92,6 +92,16 @@ case class FetchFailed(
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}
+
+ /**
+ * Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
+ * 4 task failures, instead we immediately go back to the stage which generated the map output,
+ * and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
+ * presumably its not the fault of the executor where the task ran, but the executor which
+ * stored the data. This is especially important because we we might rack up a bunch of
+ * fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
+ */
+ override def countTowardsTaskFailures: Boolean = false
}
/**
@@ -204,6 +214,7 @@ case object TaskResultLost extends TaskFailedReason {
@DeveloperApi
case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
+ override def countTowardsTaskFailures: Boolean = false
}
/**
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 517fc3e9e9..497ca92c7b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -17,6 +17,8 @@
package org.apache.spark.internal
+import java.util.concurrent.TimeUnit
+
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.Utils
@@ -91,6 +93,49 @@ package object config {
.toSequence
.createWithDefault(Nil)
+ private[spark] val MAX_TASK_FAILURES =
+ ConfigBuilder("spark.task.maxFailures")
+ .intConf
+ .createWithDefault(4)
+
+ // Blacklist confs
+ private[spark] val BLACKLIST_ENABLED =
+ ConfigBuilder("spark.blacklist.enabled")
+ .booleanConf
+ .createOptional
+
+ private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
+ ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor")
+ .intConf
+ .createWithDefault(1)
+
+ private[spark] val MAX_TASK_ATTEMPTS_PER_NODE =
+ ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode")
+ .intConf
+ .createWithDefault(2)
+
+ private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
+ ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
+ .intConf
+ .createWithDefault(2)
+
+ private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
+ ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
+ .intConf
+ .createWithDefault(2)
+
+ private[spark] val BLACKLIST_TIMEOUT_CONF =
+ ConfigBuilder("spark.blacklist.timeout")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
+ private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
+ ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
+ .internal()
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+ // End blacklist confs
+
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
new file mode 100644
index 0000000000..fca4c6d37e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+ private val DEFAULT_TIMEOUT = "1h"
+
+ /**
+ * Returns true if the blacklist is enabled, based on checking the configuration in the following
+ * order:
+ * 1. Is it specifically enabled or disabled?
+ * 2. Is it enabled via the legacy timeout conf?
+ * 3. Default is off
+ */
+ def isBlacklistEnabled(conf: SparkConf): Boolean = {
+ conf.get(config.BLACKLIST_ENABLED) match {
+ case Some(enabled) =>
+ enabled
+ case None =>
+ // if they've got a non-zero setting for the legacy conf, always enable the blacklist,
+ // otherwise, use the default.
+ val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+ conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).exists { legacyTimeout =>
+ if (legacyTimeout == 0) {
+ logWarning(s"Turning off blacklisting due to legacy configuration: $legacyKey == 0")
+ false
+ } else {
+ logWarning(s"Turning on blacklisting due to legacy configuration: $legacyKey > 0")
+ true
+ }
+ }
+ }
+ }
+
+ def getBlacklistTimeout(conf: SparkConf): Long = {
+ conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse {
+ conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse {
+ Utils.timeStringAsMs(DEFAULT_TIMEOUT)
+ }
+ }
+ }
+
+ /**
+ * Verify that blacklist configurations are consistent; if not, throw an exception. Should only
+ * be called if blacklisting is enabled.
+ *
+ * The configuration for the blacklist is expected to adhere to a few invariants. Default
+ * values follow these rules of course, but users may unwittingly change one configuration
+ * without making the corresponding adjustment elsewhere. This ensures we fail-fast when
+ * there are such misconfigurations.
+ */
+ def validateBlacklistConfs(conf: SparkConf): Unit = {
+
+ def mustBePos(k: String, v: String): Unit = {
+ throw new IllegalArgumentException(s"$k was $v, but must be > 0.")
+ }
+
+ Seq(
+ config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
+ config.MAX_TASK_ATTEMPTS_PER_NODE,
+ config.MAX_FAILURES_PER_EXEC_STAGE,
+ config.MAX_FAILED_EXEC_PER_NODE_STAGE
+ ).foreach { config =>
+ val v = conf.get(config)
+ if (v <= 0) {
+ mustBePos(config.key, v.toString)
+ }
+ }
+
+ val timeout = getBlacklistTimeout(conf)
+ if (timeout <= 0) {
+ // first, figure out where the timeout came from, to include the right conf in the message.
+ conf.get(config.BLACKLIST_TIMEOUT_CONF) match {
+ case Some(t) =>
+ mustBePos(config.BLACKLIST_TIMEOUT_CONF.key, timeout.toString)
+ case None =>
+ mustBePos(config.BLACKLIST_LEGACY_TIMEOUT_CONF.key, timeout.toString)
+ }
+ }
+
+ val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
+ val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
+
+ if (maxNodeAttempts >= maxTaskFailures) {
+ throw new IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
+ s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
+ s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
+ s"Spark will not be robust to one bad node. Decrease " +
+ s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
+ s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
new file mode 100644
index 0000000000..20ab27d127
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.HashMap
+
+/**
+ * Small helper for tracking failed tasks for blacklisting purposes. Info on all failures on one
+ * executor, within one task set.
+ */
+private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
+ /**
+ * Mapping from index of the tasks in the taskset, to the number of times it has failed on this
+ * executor.
+ */
+ val taskToFailureCount = HashMap[Int, Int]()
+
+ def updateWithFailure(taskIndex: Int): Unit = {
+ val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
+ taskToFailureCount(taskIndex) = prevFailureCount + 1
+ }
+
+ def numUniqueTasksWithFailures: Int = taskToFailureCount.size
+
+ /**
+ * Return the number of times this executor has failed on the given task index.
+ */
+ def getNumTaskFailures(index: Int): Int = {
+ taskToFailureCount.getOrElse(index, 0)
+ }
+
+ override def toString(): String = {
+ s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
+ s"tasksToFailureCount = $taskToFailureCount"
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0ad4730fe2..3e3f1ad031 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -22,14 +22,14 @@ import java.util.{Timer, TimerTask}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.Set
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -57,7 +57,7 @@ private[spark] class TaskSchedulerImpl(
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
- def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
+ def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
val conf = sc.conf
@@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl(
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
- protected val executorsByHost = new HashMap[String, HashSet[String]]
+ protected val hostToExecutors = new HashMap[String, HashSet[String]]
protected val hostsByRack = new HashMap[String, HashSet[String]]
@@ -243,8 +243,8 @@ private[spark] class TaskSchedulerImpl(
}
}
manager.parent.removeSchedulable(manager)
- logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s"
- .format(manager.taskSet.id, manager.parent.name))
+ logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
+ s" ${manager.parent.name}")
}
private def resourceOfferSingleTaskSet(
@@ -291,11 +291,11 @@ private[spark] class TaskSchedulerImpl(
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
- if (!executorsByHost.contains(o.host)) {
- executorsByHost(o.host) = new HashSet[String]()
+ if (!hostToExecutors.contains(o.host)) {
+ hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToTaskCount.contains(o.executorId)) {
- executorsByHost(o.host) += o.executorId
+ hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount(o.executorId) = 0
@@ -334,7 +334,7 @@ private[spark] class TaskSchedulerImpl(
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
- taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+ taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
@@ -542,10 +542,10 @@ private[spark] class TaskSchedulerImpl(
executorIdToTaskCount -= executorId
val host = executorIdToHost(executorId)
- val execs = executorsByHost.getOrElse(host, new HashSet)
+ val execs = hostToExecutors.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
- executorsByHost -= host
+ hostToExecutors -= host
for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
hosts -= host
if (hosts.isEmpty) {
@@ -565,11 +565,11 @@ private[spark] class TaskSchedulerImpl(
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
- executorsByHost.get(host).map(_.toSet)
+ hostToExecutors.get(host).map(_.toSet)
}
def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
- executorsByHost.contains(host)
+ hostToExecutors.contains(host)
}
def hasHostAliveOnRack(rack: String): Boolean = synchronized {
@@ -662,5 +662,4 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
new file mode 100644
index 0000000000..f4b0f55b76
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.{HashMap, HashSet}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.util.Clock
+
+/**
+ * Handles blacklisting executors and nodes within a taskset. This includes blacklisting specific
+ * (task, executor) / (task, nodes) pairs, and also completely blacklisting executors and nodes
+ * for the entire taskset.
+ *
+ * THREADING: This class is a helper to [[TaskSetManager]]; as with the methods in
+ * [[TaskSetManager]] this class is designed only to be called from code with a lock on the
+ * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
+ */
+private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, val clock: Clock)
+ extends Logging {
+
+ private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
+ private val MAX_TASK_ATTEMPTS_PER_NODE = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
+ private val MAX_FAILURES_PER_EXEC_STAGE = conf.get(config.MAX_FAILURES_PER_EXEC_STAGE)
+ private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)
+
+ /**
+ * A map from each executor to the task failures on that executor.
+ */
+ val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
+
+ /**
+ * Map from node to all executors on it with failures. Needed because we want to know about
+ * executors on a node even after they have died. (We don't want to bother tracking the
+ * node -> execs mapping in the usual case when there aren't any failures).
+ */
+ private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
+ private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
+ private val blacklistedExecs = new HashSet[String]()
+ private val blacklistedNodes = new HashSet[String]()
+
+ /**
+ * Return true if this executor is blacklisted for the given task. This does *not*
+ * need to return true if the executor is blacklisted for the entire stage.
+ * That is to keep this method as fast as possible in the inner-loop of the
+ * scheduler, where those filters will have already been applied.
+ */
+ def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
+ execToFailures.get(executorId).exists { execFailures =>
+ execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
+ }
+ }
+
+ def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
+ nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
+ }
+
+ /**
+ * Return true if this executor is blacklisted for the given stage. Completely ignores
+ * anything to do with the node the executor is on. That
+ * is to keep this method as fast as possible in the inner-loop of the scheduler, where those
+ * filters will already have been applied.
+ */
+ def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
+ blacklistedExecs.contains(executorId)
+ }
+
+ def isNodeBlacklistedForTaskSet(node: String): Boolean = {
+ blacklistedNodes.contains(node)
+ }
+
+ private[scheduler] def updateBlacklistForFailedTask(
+ host: String,
+ exec: String,
+ index: Int): Unit = {
+ val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
+ execFailures.updateWithFailure(index)
+
+ // check if this task has also failed on other executors on the same host -- if its gone
+ // over the limit, blacklist this task from the entire host.
+ val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
+ execsWithFailuresOnNode += exec
+ val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
+ execToFailures.get(exec).map { failures =>
+ // We count task attempts here, not the number of unique executors with failures. This is
+ // because jobs are aborted based on the number task attempts; if we counted unique
+ // executors, it would be hard to config to ensure that you try another
+ // node before hitting the max number of task failures.
+ failures.getNumTaskFailures(index)
+ }
+ }.sum
+ if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
+ nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
+ }
+
+ // Check if enough tasks have failed on the executor to blacklist it for the entire stage.
+ if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
+ if (blacklistedExecs.add(exec)) {
+ logInfo(s"Blacklisting executor ${exec} for stage $stageId")
+ // This executor has been pushed into the blacklist for this stage. Let's check if it
+ // pushes the whole node into the blacklist.
+ val blacklistedExecutorsOnNode =
+ execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+ if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
+ if (blacklistedNodes.add(host)) {
+ logInfo(s"Blacklisting ${host} for stage $stageId")
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 226bed284a..9491bc7a04 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -22,9 +22,7 @@ import java.nio.ByteBuffer
import java.util.Arrays
import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.math.{max, min}
import scala.util.control.NonFatal
@@ -53,19 +51,9 @@ private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
- clock: Clock = new SystemClock())
- extends Schedulable with Logging {
+ clock: Clock = new SystemClock()) extends Schedulable with Logging {
- val conf = sched.sc.conf
-
- /*
- * Sometimes if an executor is dead or in an otherwise invalid state, the driver
- * does not realize right away leading to repeated task failures. If enabled,
- * this temporarily prevents a task from re-launching on an executor where
- * it just failed.
- */
- private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
- conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
+ private val conf = sched.sc.conf
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
@@ -83,8 +71,6 @@ private[spark] class TaskSetManager(
val copiesRunning = new Array[Int](numTasks)
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)
- // key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed
- private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0
@@ -98,6 +84,14 @@ private[spark] class TaskSetManager(
var totalResultSize = 0L
var calculatedTasks = 0
+ private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
+ if (BlacklistTracker.isBlacklistEnabled(conf)) {
+ Some(new TaskSetBlacklist(conf, stageId, clock))
+ } else {
+ None
+ }
+ }
+
val runningTasksSet = new HashSet[Long]
override def runningTasks: Int = runningTasksSet.size
@@ -245,12 +239,15 @@ private[spark] class TaskSetManager(
* This method also cleans up any tasks in the list that have already
* been launched, since we want that to happen lazily.
*/
- private def dequeueTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
+ private def dequeueTaskFromList(
+ execId: String,
+ host: String,
+ list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
- if (!executorIsBlacklisted(execId, index)) {
+ if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {
// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
@@ -266,19 +263,11 @@ private[spark] class TaskSetManager(
taskAttempts(taskIndex).exists(_.host == host)
}
- /**
- * Is this re-execution of a failed task on an executor it already failed in before
- * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
- */
- private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
- if (failedExecutors.contains(taskId)) {
- val failed = failedExecutors.get(taskId).get
-
- return failed.contains(execId) &&
- clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
+ private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = {
+ taskSetBlacklistHelperOpt.exists { blacklist =>
+ blacklist.isNodeBlacklistedForTask(host, index) ||
+ blacklist.isExecutorBlacklistedForTask(execId, index)
}
-
- false
}
/**
@@ -292,8 +281,10 @@ private[spark] class TaskSetManager(
{
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
- def canRunOnHost(index: Int): Boolean =
- !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
+ def canRunOnHost(index: Int): Boolean = {
+ !hasAttemptOnHost(index, host) &&
+ !isTaskBlacklistedOnExecOrNode(index, execId, host)
+ }
if (!speculatableTasks.isEmpty) {
// Check for process-local tasks; note that tasks can be process-local
@@ -366,19 +357,19 @@ private[spark] class TaskSetManager(
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
- for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
+ for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
- for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
+ for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
- for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
+ for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
@@ -386,14 +377,14 @@ private[spark] class TaskSetManager(
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
- index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
+ index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
- for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
+ for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}
@@ -421,7 +412,11 @@ private[spark] class TaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
- if (!isZombie) {
+ val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
+ blacklist.isNodeBlacklistedForTaskSet(host) ||
+ blacklist.isExecutorBlacklistedForTaskSet(execId)
+ }
+ if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
@@ -434,60 +429,59 @@ private[spark] class TaskSetManager(
}
}
- dequeueTask(execId, host, allowedLocality) match {
- case Some((index, taskLocality, speculative)) =>
- // Found a task; do some bookkeeping and return a task description
- val task = tasks(index)
- val taskId = sched.newTaskId()
- // Do various bookkeeping
- copiesRunning(index) += 1
- val attemptNum = taskAttempts(index).size
- val info = new TaskInfo(taskId, index, attemptNum, curTime,
- execId, host, taskLocality, speculative)
- taskInfos(taskId) = info
- taskAttempts(index) = info :: taskAttempts(index)
- // Update our locality level for delay scheduling
- // NO_PREF will not affect the variables related to delay scheduling
- if (maxLocality != TaskLocality.NO_PREF) {
- currentLocalityIndex = getLocalityIndex(taskLocality)
- lastLaunchTime = curTime
- }
- // Serialize and return the task
- val startTime = clock.getTimeMillis()
- val serializedTask: ByteBuffer = try {
- Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
- } catch {
- // If the task cannot be serialized, then there's no point to re-attempt the task,
- // as it will always fail. So just abort the whole task-set.
- case NonFatal(e) =>
- val msg = s"Failed to serialize task $taskId, not attempting to retry it."
- logError(msg, e)
- abort(s"$msg Exception during serialization: $e")
- throw new TaskNotSerializableException(e)
- }
- if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
- !emittedTaskSizeWarning) {
- emittedTaskSizeWarning = true
- logWarning(s"Stage ${task.stageId} contains a task of very large size " +
- s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
- s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
- }
- addRunningTask(taskId)
-
- // We used to log the time it takes to serialize the task, but task size is already
- // a good proxy to task serialization time.
- // val timeTaken = clock.getTime() - startTime
- val taskName = s"task ${info.id} in stage ${taskSet.id}"
- logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
- s" $taskLocality, ${serializedTask.limit} bytes)")
-
- sched.dagScheduler.taskStarted(task, info)
- return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
- taskName, index, serializedTask))
- case _ =>
+ dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
+ // Found a task; do some bookkeeping and return a task description
+ val task = tasks(index)
+ val taskId = sched.newTaskId()
+ // Do various bookkeeping
+ copiesRunning(index) += 1
+ val attemptNum = taskAttempts(index).size
+ val info = new TaskInfo(taskId, index, attemptNum, curTime,
+ execId, host, taskLocality, speculative)
+ taskInfos(taskId) = info
+ taskAttempts(index) = info :: taskAttempts(index)
+ // Update our locality level for delay scheduling
+ // NO_PREF will not affect the variables related to delay scheduling
+ if (maxLocality != TaskLocality.NO_PREF) {
+ currentLocalityIndex = getLocalityIndex(taskLocality)
+ lastLaunchTime = curTime
+ }
+ // Serialize and return the task
+ val startTime = clock.getTimeMillis()
+ val serializedTask: ByteBuffer = try {
+ Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+ } catch {
+ // If the task cannot be serialized, then there's no point to re-attempt the task,
+ // as it will always fail. So just abort the whole task-set.
+ case NonFatal(e) =>
+ val msg = s"Failed to serialize task $taskId, not attempting to retry it."
+ logError(msg, e)
+ abort(s"$msg Exception during serialization: $e")
+ throw new TaskNotSerializableException(e)
+ }
+ if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
+ !emittedTaskSizeWarning) {
+ emittedTaskSizeWarning = true
+ logWarning(s"Stage ${task.stageId} contains a task of very large size " +
+ s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
+ s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+ }
+ addRunningTask(taskId)
+
+ // We used to log the time it takes to serialize the task, but task size is already
+ // a good proxy to task serialization time.
+ // val timeTaken = clock.getTime() - startTime
+ val taskName = s"task ${info.id} in stage ${taskSet.id}"
+ logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
+ s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
+
+ sched.dagScheduler.taskStarted(task, info)
+ new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
+ taskName, index, serializedTask)
}
+ } else {
+ None
}
- None
}
private def maybeFinishTaskSet() {
@@ -589,37 +583,56 @@ private[spark] class TaskSetManager(
* the hang as quickly as we could have, but we'll always detect the hang eventually, and the
* method is faster in the typical case. In the worst case, this method can take
* O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task
- * failures (this is because the method picks on unscheduled task, and then iterates through each
- * executor until it finds one that the task hasn't failed on already).
+ * failures (this is because the method picks one unscheduled task, and then iterates through each
+ * executor until it finds one that the task isn't blacklisted on).
*/
- private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {
-
- val pendingTask: Option[Int] = {
- // usually this will just take the last pending task, but because of the lazy removal
- // from each list, we may need to go deeper in the list. We poll from the end because
- // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
- // an unschedulable task this way.
- val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
- copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
- }
- if (indexOffset == -1) {
- None
- } else {
- Some(allPendingTasks(indexOffset))
- }
- }
+ private[scheduler] def abortIfCompletelyBlacklisted(
+ hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
+ taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+ // Only look for unschedulable tasks when at least one executor has registered. Otherwise,
+ // task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
+ if (hostToExecutors.nonEmpty) {
+ // find any task that needs to be scheduled
+ val pendingTask: Option[Int] = {
+ // usually this will just take the last pending task, but because of the lazy removal
+ // from each list, we may need to go deeper in the list. We poll from the end because
+ // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
+ // an unschedulable task this way.
+ val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+ copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
+ }
+ if (indexOffset == -1) {
+ None
+ } else {
+ Some(allPendingTasks(indexOffset))
+ }
+ }
- // If no executors have registered yet, don't abort the stage, just wait. We probably
- // got here because a task set was added before the executors registered.
- if (executors.nonEmpty) {
- // take any task that needs to be scheduled, and see if we can find some executor it *could*
- // run on
- pendingTask.foreach { taskId =>
- if (executors.forall(executorIsBlacklisted(_, taskId))) {
- val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
- val partition = tasks(taskId).partitionId
- abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
- s" has already failed on executors $execs, and no other executors are available.")
+ pendingTask.foreach { indexInTaskSet =>
+ // try to find some executor this task can run on. Its possible that some *other*
+ // task isn't schedulable anywhere, but we will discover that in some later call,
+ // when that unschedulable task is the last task remaining.
+ val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
+ // Check if the task can run on the node
+ val nodeBlacklisted =
+ taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
+ taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet)
+ if (nodeBlacklisted) {
+ true
+ } else {
+ // Check if the task can run on any of the executors
+ execsOnHost.forall { exec =>
+ taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) ||
+ taskSetBlacklist.isExecutorBlacklistedForTask(exec, indexInTaskSet)
+ }
+ }
+ }
+ if (blacklistedEverywhere) {
+ val partition = tasks(indexInTaskSet).partitionId
+ abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
+ s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
+ s"can be configured via spark.blacklist.*.")
+ }
}
}
}
@@ -677,8 +690,9 @@ private[spark] class TaskSetManager(
}
if (!successful(index)) {
tasksSuccessful += 1
- logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
- info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
+ logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
+ s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
+ s" ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
@@ -688,7 +702,6 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
- failedExecutors.remove(index)
maybeFinishTaskSet()
}
@@ -706,8 +719,8 @@ private[spark] class TaskSetManager(
val index = info.index
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
- val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
- reason.toErrorString
+ val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," +
+ s" executor ${info.executorId}): ${reason.toErrorString}"
val failureException: Option[Throwable] = reason match {
case fetchFailed: FetchFailed =>
logWarning(failureReason)
@@ -715,7 +728,6 @@ private[spark] class TaskSetManager(
successful(index) = true
tasksSuccessful += 1
}
- // Not adding to failed executors for FetchFailed.
isZombie = true
None
@@ -751,8 +763,8 @@ private[spark] class TaskSetManager(
logWarning(failureReason)
} else {
logInfo(
- s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on executor ${info.host}: " +
- s"${ef.className} (${ef.description}) [duplicate $dupCount]")
+ s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on ${info.host}, executor" +
+ s" ${info.executorId}: ${ef.className} (${ef.description}) [duplicate $dupCount]")
}
ef.exception
@@ -766,9 +778,7 @@ private[spark] class TaskSetManager(
logWarning(failureReason)
None
}
- // always add to failed executors
- failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
- put(info.executorId, clock.getTimeMillis())
+
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
if (successful(index)) {
@@ -780,7 +790,9 @@ private[spark] class TaskSetManager(
addPendingTask(index)
}
- if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) {
+ if (!isZombie && reason.countTowardsTaskFailures) {
+ taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
+ info.host, info.executorId, index))
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 14c8b664d4..f6015cd51c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -16,10 +16,10 @@
*/
package org.apache.spark.scheduler
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.spark._
+import org.apache.spark.internal.config
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{
@@ -42,7 +42,10 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
- testScheduler("If preferred node is bad, without blacklist job will fail") {
+ testScheduler("If preferred node is bad, without blacklist job will fail",
+ extraConfs = Seq(
+ config.BLACKLIST_ENABLED.key -> "false"
+ )) {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
@@ -51,37 +54,38 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
assertDataStructuresEmpty(noFailure = false)
}
- // even with the blacklist turned on, if maxTaskFailures is not more than the number
- // of executors on the bad node, then locality preferences will lead to us cycling through
- // the executors on the bad node, and still failing the job
testScheduler(
- "With blacklist on, job will still fail if there are too many bad executors on bad host",
+ "With default settings, job can succeed despite multiple bad executors on node",
extraConfs = Seq(
- // set this to something much longer than the test duration so that executors don't get
- // removed from the blacklist during the test
- ("spark.scheduler.executorTaskBlacklistTime", "10000000")
+ config.BLACKLIST_ENABLED.key -> "true",
+ config.MAX_TASK_FAILURES.key -> "4",
+ "spark.testing.nHosts" -> "2",
+ "spark.testing.nExecutorsPerHost" -> "5",
+ "spark.testing.nCoresPerExecutor" -> "10"
)
) {
- val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
+ // To reliably reproduce the failure that would occur without blacklisting, we have to use 1
+ // task. That way, we ensure this 1 task gets rotated through enough bad executors on the host
+ // to fail the taskSet, before we have a bunch of different tasks fail in the executors so we
+ // blacklist them.
+ // But the point here is -- without blacklisting, we would never schedule anything on the good
+ // host-1 before we hit too many failures trying our preferred host-0.
+ val rdd = new MockRDDWithLocalityPrefs(sc, 1, Nil, badHost)
withBackend(badHostBackend _) {
- val jobFuture = submit(rdd, (0 until 10).toArray)
+ val jobFuture = submit(rdd, (0 until 1).toArray)
awaitJobTermination(jobFuture, duration)
}
- assertDataStructuresEmpty(noFailure = false)
+ assertDataStructuresEmpty(noFailure = true)
}
- // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
- // schedule on a good node and succeed the job
+ // Here we run with the blacklist on, and the default config takes care of having this
+ // robust to one bad node.
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
- // set this to something much longer than the test duration so that executors don't get
- // removed from the blacklist during the test
- ("spark.scheduler.executorTaskBlacklistTime", "10000000"),
- // this has to be higher than the number of executors on the bad host
- ("spark.task.maxFailures", "5"),
+ config.BLACKLIST_ENABLED.key -> "true",
// just to avoid this test taking too long
- ("spark.locality.wait", "10ms")
+ "spark.locality.wait" -> "10ms"
)
) {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
@@ -98,9 +102,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
testScheduler(
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
- // set this to something much longer than the test duration so that executors don't get
- // removed from the blacklist during the test
- "spark.scheduler.executorTaskBlacklistTime" -> "10000000",
+ config.BLACKLIST_ENABLED.key -> "true",
"spark.testing.nHosts" -> "2",
"spark.testing.nExecutorsPerHost" -> "1",
"spark.testing.nCoresPerExecutor" -> "1"
@@ -112,9 +114,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
val pattern = ("Aborting TaskSet 0.0 because task .* " +
- "already failed on executors \\(.*\\), and no other executors are available").r
+ "cannot run anywhere due to node and executor blacklist").r
assert(pattern.findFirstIn(failure.getMessage).isDefined,
s"Couldn't find $pattern in ${failure.getMessage()}")
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
new file mode 100644
index 0000000000..b2e7ec5df0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config
+
+class BlacklistTrackerSuite extends SparkFunSuite {
+
+ test("blacklist still respects legacy configs") {
+ val conf = new SparkConf().setMaster("local")
+ assert(!BlacklistTracker.isBlacklistEnabled(conf))
+ conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 5000L)
+ assert(BlacklistTracker.isBlacklistEnabled(conf))
+ assert(5000 === BlacklistTracker.getBlacklistTimeout(conf))
+ // the new conf takes precedence, though
+ conf.set(config.BLACKLIST_TIMEOUT_CONF, 1000L)
+ assert(1000 === BlacklistTracker.getBlacklistTimeout(conf))
+
+ // if you explicitly set the legacy conf to 0, that also would disable blacklisting
+ conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 0L)
+ assert(!BlacklistTracker.isBlacklistEnabled(conf))
+ // but again, the new conf takes precendence
+ conf.set(config.BLACKLIST_ENABLED, true)
+ assert(BlacklistTracker.isBlacklistEnabled(conf))
+ assert(1000 === BlacklistTracker.getBlacklistTimeout(conf))
+ }
+
+ test("check blacklist configuration invariants") {
+ val conf = new SparkConf().setMaster("yarn-cluster")
+ Seq(
+ (2, 2),
+ (2, 3)
+ ).foreach { case (maxTaskFailures, maxNodeAttempts) =>
+ conf.set(config.MAX_TASK_FAILURES, maxTaskFailures)
+ conf.set(config.MAX_TASK_ATTEMPTS_PER_NODE.key, maxNodeAttempts.toString)
+ val excMsg = intercept[IllegalArgumentException] {
+ BlacklistTracker.validateBlacklistConfs(conf)
+ }.getMessage()
+ assert(excMsg === s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
+ s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
+ s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
+ s"Spark will not be robust to one bad node. Decrease " +
+ s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
+ s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
+ }
+
+ conf.remove(config.MAX_TASK_FAILURES)
+ conf.remove(config.MAX_TASK_ATTEMPTS_PER_NODE)
+
+ Seq(
+ config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
+ config.MAX_TASK_ATTEMPTS_PER_NODE,
+ config.MAX_FAILURES_PER_EXEC_STAGE,
+ config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+ config.BLACKLIST_TIMEOUT_CONF
+ ).foreach { config =>
+ conf.set(config.key, "0")
+ val excMsg = intercept[IllegalArgumentException] {
+ BlacklistTracker.validateBlacklistConfs(conf)
+ }.getMessage()
+ assert(excMsg.contains(s"${config.key} was 0, but must be > 0."))
+ conf.remove(config)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 5cd548bbc7..c28aa06623 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -620,9 +620,9 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val duration = Duration(1, SECONDS)
awaitJobTermination(jobFuture, duration)
}
+ assertDataStructuresEmpty()
assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
- assertDataStructuresEmpty()
}
testScheduler("job failure after 4 attempts") {
@@ -634,7 +634,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
awaitJobTermination(jobFuture, duration)
- failure.getMessage.contains("test task failure")
+ assert(failure.getMessage.contains("test task failure"))
}
assertDataStructuresEmpty(noFailure = false)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 61787b54f8..f5f1947661 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import org.scalatest.BeforeAndAfterEach
import org.apache.spark._
+import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
class FakeSchedulerBackend extends SchedulerBackend {
@@ -32,7 +33,6 @@ class FakeSchedulerBackend extends SchedulerBackend {
class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
with Logging {
-
var failedTaskSetException: Option[Throwable] = None
var failedTaskSetReason: String = null
var failedTaskSet = false
@@ -60,10 +60,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
confs.foreach { case (k, v) =>
- sc.conf.set(k, v)
+ conf.set(k, v)
}
+ sc = new SparkContext(conf)
taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
@@ -287,9 +288,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// schedulable on another executor. However, that executor may fail later on, leaving the
// first task with no place to run.
val taskScheduler = setupScheduler(
- // set this to something much longer than the test duration so that executors don't get
- // removed from the blacklist during the test
- "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+ config.BLACKLIST_ENABLED.key -> "true"
)
val taskSet = FakeTask.createTaskSet(2)
@@ -328,8 +327,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.isZombie)
assert(failedTaskSet)
val idx = failedTask.index
- assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx (partition $idx) has " +
- s"already failed on executors (executor0), and no other executors are available.")
+ assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
+ s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior can be " +
+ s"configured via spark.blacklist.*.")
}
test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
@@ -339,9 +339,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// available and not bail on the job
val taskScheduler = setupScheduler(
- // set this to something much longer than the test duration so that executors don't get
- // removed from the blacklist during the test
- "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+ config.BLACKLIST_ENABLED.key -> "true"
)
val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*)
@@ -377,7 +375,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskScheduler = setupScheduler()
taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0,
- (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
+ (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _*
))
val taskDescs = taskScheduler.resourceOffers(IndexedSeq(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
new file mode 100644
index 0000000000..8c902af568
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.{ManualClock, SystemClock}
+
+class TaskSetBlacklistSuite extends SparkFunSuite {
+
+ test("Blacklisting tasks, executors, and nodes") {
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+ .set(config.BLACKLIST_ENABLED.key, "true")
+ val clock = new ManualClock
+
+ val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, clock = clock)
+ clock.setTime(0)
+ // We will mark task 0 & 1 failed on both executor 1 & 2.
+ // We should blacklist all executors on that host, for all tasks for the stage. Note the API
+ // will return false for isExecutorBacklistedForTaskSet even when the node is blacklisted, so
+ // the executor is implicitly blacklisted (this makes sense with how the scheduler uses the
+ // blacklist)
+
+ // First, mark task 0 as failed on exec1.
+ // task 0 should be blacklisted on exec1, and nowhere else
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 0)
+ for {
+ executor <- (1 to 4).map(_.toString)
+ index <- 0 until 10
+ } {
+ val shouldBeBlacklisted = (executor == "exec1" && index == 0)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === shouldBeBlacklisted)
+ }
+ assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+ // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 1)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+ // Mark one task as failed on exec2 -- not enough for any further blacklisting yet.
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 0)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+ assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+ // Mark another task as failed on exec2 -- now we blacklist exec2, which also leads to
+ // blacklisting the entire node.
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 1)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+ assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+ // Make sure the blacklist has the correct per-task && per-executor responses, over a wider
+ // range of inputs.
+ for {
+ executor <- (1 to 4).map(e => s"exec$e")
+ index <- 0 until 10
+ } {
+ withClue(s"exec = $executor; index = $index") {
+ val badExec = (executor == "exec1" || executor == "exec2")
+ val badIndex = (index == 0 || index == 1)
+ assert(
+ // this ignores whether the executor is blacklisted entirely for the taskset -- that is
+ // intentional, it keeps it fast and is sufficient for usage in the scheduler.
+ taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === (badExec && badIndex))
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet(executor) === badExec)
+ }
+ }
+ assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+ val execToFailures = taskSetBlacklist.execToFailures
+ assert(execToFailures.keySet === Set("exec1", "exec2"))
+
+ Seq("exec1", "exec2").foreach { exec =>
+ assert(
+ execToFailures(exec).taskToFailureCount === Map(
+ 0 -> 1,
+ 1 -> 1
+ )
+ )
+ }
+ }
+
+ test("multiple attempts for the same task count once") {
+ // Make sure that for blacklisting tasks, the node counts task attempts, not executors. But for
+ // stage-level blacklisting, we count unique tasks. The reason for this difference is, with
+ // task-attempt blacklisting, we want to make it easy to configure so that you ensure a node
+ // is blacklisted before the taskset is completely aborted because of spark.task.maxFailures.
+ // But with stage-blacklisting, we want to make sure we're not just counting one bad task
+ // that has failed many times.
+
+ val conf = new SparkConf().setMaster("local").setAppName("test")
+ .set(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR, 2)
+ .set(config.MAX_TASK_ATTEMPTS_PER_NODE, 3)
+ .set(config.MAX_FAILURES_PER_EXEC_STAGE, 2)
+ .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3)
+ val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock())
+ // Fail a task twice on hostA, exec:1
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTask("1", 0))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
+ assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+ // Fail the same task once more on hostA, exec:2
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 0)
+ assert(taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
+ assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+ // Fail another task on hostA, exec:1. Now that executor has failures on two different tasks,
+ // so its blacklisted
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+ // Fail a third task on hostA, exec:2, so that exec is blacklisted for the whole task set
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 2)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+ // Fail a fourth & fifth task on hostA, exec:3. Now we've got three executors that are
+ // blacklisted for the taskset, so blacklist the whole node.
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 3)
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 4)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("3"))
+ assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+ }
+
+ test("only blacklist nodes for the task set when all the blacklisted executors are all on " +
+ "same host") {
+ // we blacklist executors on two different hosts within one taskSet -- make sure that doesn't
+ // lead to any node blacklisting
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+ .set(config.BLACKLIST_ENABLED.key, "true")
+ val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock())
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+ taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+ taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
+ taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+ assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+ assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostB"))
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 7d6ad08036..69edcf3347 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.mockito.Mockito.{mock, verify}
import org.apache.spark._
+import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
import org.apache.spark.util.{AccumulatorV2, ManualClock}
@@ -103,7 +104,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val host = executorIdToHost.get(execId)
assert(host != None)
val hostId = host.get
- val executorsOnHost = executorsByHost(hostId)
+ val executorsOnHost = hostToExecutors(hostId)
executorsOnHost -= execId
for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) {
hosts -= hostId
@@ -125,7 +126,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
def addExecutor(execId: String, host: String) {
executors.put(execId, host)
- val executorsOnHost = executorsByHost.getOrElseUpdate(host, new mutable.HashSet[String])
+ val executorsOnHost = hostToExecutors.getOrElseUpdate(host, new mutable.HashSet[String])
executorsOnHost += execId
executorIdToHost += execId -> host
for (rack <- getRackForHost(host)) {
@@ -411,7 +412,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("executors should be blacklisted after task failure, in spite of locality preferences") {
val rescheduleDelay = 300L
val conf = new SparkConf().
- set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
+ set(config.BLACKLIST_ENABLED, true).
+ set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay).
// don't wait to jump locality levels in this test
set("spark.locality.wait", "0")
@@ -475,19 +477,24 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
}
- // After reschedule delay, scheduling on exec1 should be possible.
+ // Despite advancing beyond the time for expiring executors from within the blacklist,
+ // we *never* expire from *within* the stage blacklist
clock.advance(rescheduleDelay)
{
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
- assert(offerResult.isDefined, "Expect resource offer to return a task")
+ assert(offerResult.isEmpty)
+ }
+ {
+ val offerResult = manager.resourceOffer("exec3", "host3", ANY)
+ assert(offerResult.isDefined)
assert(offerResult.get.index === 0)
- assert(offerResult.get.executorId === "exec1")
+ assert(offerResult.get.executorId === "exec3")
- assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
- // Cause exec1 to fail : failure 4
+ // Cause exec3 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
}
@@ -859,6 +866,114 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(sched.endedTasks(3) === Success)
}
+ test("Killing speculative tasks does not count towards aborting the taskset") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = FakeTask.createTaskSet(5)
+ // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+ sc.conf.set("spark.speculation.multiplier", "0.0")
+ sc.conf.set("spark.speculation.quantile", "0.6")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+ // Offer resources for 5 tasks to start
+ val tasks = new ArrayBuffer[TaskDescription]()
+ for ((k, v) <- List(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
+ val taskOption = manager.resourceOffer(k, v, NO_PREF)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === k)
+ tasks += task
+ }
+ assert(sched.startedTasks.toSet === (0 until 5).toSet)
+ // Complete 3 tasks and leave 2 tasks in running
+ for (id <- Set(0, 1, 2)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ def runningTaskForIndex(index: Int): TaskDescription = {
+ tasks.find { task =>
+ task.index == index && !sched.endedTasks.contains(task.taskId)
+ }.getOrElse {
+ throw new RuntimeException(s"couldn't find index $index in " +
+ s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" +
+ s" ${sched.endedTasks.keys}")
+ }
+ }
+
+ // have each of the running tasks fail 3 times (not enough to abort the stage)
+ (0 until 3).foreach { attempt =>
+ Seq(3, 4).foreach { index =>
+ val task = runningTaskForIndex(index)
+ logInfo(s"failing task $task")
+ val endReason = ExceptionFailure("a", "b", Array(), "c", None)
+ manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason)
+ sched.endedTasks(task.taskId) = endReason
+ assert(!manager.isZombie)
+ val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF)
+ assert(nextTask.isDefined, s"no offer for attempt $attempt of $index")
+ tasks += nextTask.get
+ }
+ }
+
+ // we can't be sure which one of our running tasks will get another speculative copy
+ val originalTasks = Seq(3, 4).map { index => index -> runningTaskForIndex(index) }.toMap
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+ // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+ // > 0ms, so advance the clock by 1ms here.
+ clock.advance(1)
+ assert(manager.checkSpeculatableTasks(0))
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
+ assert(taskOption5.isDefined)
+ val speculativeTask = taskOption5.get
+ assert(speculativeTask.index === 3 || speculativeTask.index === 4)
+ assert(speculativeTask.taskId === 11)
+ assert(speculativeTask.executorId === "exec1")
+ assert(speculativeTask.attemptNumber === 4)
+ sched.backend = mock(classOf[SchedulerBackend])
+ // Complete the speculative attempt for the running task
+ manager.handleSuccessfulTask(speculativeTask.taskId, createTaskResult(3, accumUpdatesByTask(3)))
+ // Verify that it kills other running attempt
+ val origTask = originalTasks(speculativeTask.index)
+ verify(sched.backend).killTask(origTask.taskId, "exec2", true)
+ // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
+ // killed, so the FakeTaskScheduler is only told about the successful completion
+ // of the speculated task.
+ assert(sched.endedTasks(3) === Success)
+ // also because the scheduler is a mock, our manager isn't notified about the task killed event,
+ // so we do that manually
+ manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled)
+ // this task has "failed" 4 times, but one of them doesn't count, so keep running the stage
+ assert(manager.tasksSuccessful === 4)
+ assert(!manager.isZombie)
+
+ // now run another speculative task
+ val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF)
+ assert(taskOpt6.isDefined)
+ val speculativeTask2 = taskOpt6.get
+ assert(speculativeTask2.index === 3 || speculativeTask2.index === 4)
+ assert(speculativeTask2.index !== speculativeTask.index)
+ assert(speculativeTask2.attemptNumber === 4)
+ // Complete the speculative attempt for the running task
+ manager.handleSuccessfulTask(speculativeTask2.taskId,
+ createTaskResult(3, accumUpdatesByTask(3)))
+ // Verify that it kills other running attempt
+ val origTask2 = originalTasks(speculativeTask2.index)
+ verify(sched.backend).killTask(origTask2.taskId, "exec2", true)
+ assert(manager.tasksSuccessful === 5)
+ assert(manager.isZombie)
+ }
+
private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index c1484b0afa..46aa9c3798 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.serializer
import com.esotericsoftware.kryo.Kryo
import org.apache.spark._
+import org.apache.spark.internal.config
import org.apache.spark.serializer.KryoDistributedTest._
import org.apache.spark.util.Utils
@@ -29,7 +30,8 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
val conf = new SparkConf(false)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
- .set("spark.task.maxFailures", "1")
+ .set(config.MAX_TASK_FAILURES, 1)
+ .set(config.BLACKLIST_ENABLED, false)
val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
conf.setJars(List(jar.getPath))
diff --git a/docs/configuration.md b/docs/configuration.md
index 82ce232b33..373e22d71a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1246,6 +1246,49 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.blacklist.enabled</code></td>
+ <td>
+ false
+ </td>
+ <td>
+ If set to "true", prevent Spark from scheduling tasks on executors that have been blacklisted
+ due to too many task failures. The blacklisting algorithm can be further controlled by the
+ other "spark.blacklist" configuration options.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.blacklist.task.maxTaskAttemptsPerExecutor</code></td>
+ <td>1</td>
+ <td>
+ (Experimental) For a given task, how many times it can be retried on one executor before the
+ executor is blacklisted for that task.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.blacklist.task.maxTaskAttemptsPerNode</code></td>
+ <td>2</td>
+ <td>
+ (Experimental) For a given task, how many times it can be retried on one node, before the entire
+ node is blacklisted for that task.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.blacklist.stage.maxFailedTasksPerExecutor</code>
+ <td>2</td>
+ <td>
+ (Experimental) How many different tasks must fail on one executor, within one stage, before the
+ executor is blacklisted for that stage.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.blacklist.stage.maxFailedExecutorsPerNode</code></td>
+ <td>2</td>
+ <td>
+ (Experimental) How many different executors are marked as blacklisted for a given stage, before
+ the entire node is marked as failed for the stage.
+ </td>
+</tr>
+<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
<td>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 6e60b0e4fa..19b6d26031 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -23,6 +23,7 @@ import org.mockito.Mockito.mock
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -446,7 +447,7 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
- .set("spark.task.maxFailures", "1") // Don't retry the tasks to run this test quickly
+ .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
val sc = new SparkContext(conf)
try {