aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-02-16 22:27:34 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2016-02-16 22:27:39 -0800
commit1e1e31e03df14f2e7a9654e640fb2796cf059fe0 (patch)
treef463cdf7eb8b5bc63508a4b314a5b3e287761d49 /core
parent7218c0eba957e0a079a407b79c3a050cce9647b2 (diff)
downloadspark-1e1e31e03df14f2e7a9654e640fb2796cf059fe0.tar.gz
spark-1e1e31e03df14f2e7a9654e640fb2796cf059fe0.tar.bz2
spark-1e1e31e03df14f2e7a9654e640fb2796cf059fe0.zip
[SPARK-13279] Remove O(n^2) operation from scheduler.
This commit removes an unnecessary duplicate check in addPendingTask that meant that scheduling a task set took time proportional to (# tasks)^2. Author: Sital Kedia <skedia@fb.com> Closes #11175 from sitalkedia/fix_stuck_driver.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala28
1 files changed, 13 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index cf97877476..05cfa52f16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
- // back at the head of the stack. They are also only cleaned up lazily;
- // when a task is launched, it remains in all the pending lists except
- // the one that it was launched from, but gets removed from them later.
+ // back at the head of the stack. These collections may contain duplicates
+ // for two reasons:
+ // (1): Tasks are only removed lazily; when a task is launched, it remains
+ // in all the pending lists except the one that it was launched from.
+ // (2): Tasks may be re-added to these lists multiple times as a result
+ // of failures.
+ // Duplicates are handled in dequeueTaskFromList, which ensures that a
+ // task hasn't already started running before launching it.
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
@@ -179,23 +184,16 @@ private[spark] class TaskSetManager(
/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
- // Utility method that adds `index` to a list only if it's not already there
- def addTo(list: ArrayBuffer[Int]) {
- if (!list.contains(index)) {
- list += index
- }
- }
-
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
- addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
+ pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
- addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
+ pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
@@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
}
case _ => Unit
}
- addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
+ pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
- addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
+ pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
- addTo(pendingTasksWithNoPrefs)
+ pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there