From 50fa6fd1b365d5db7e2b2c59624a365cef0d1696 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 11 Feb 2016 13:28:03 -0800 Subject: [SPARK-13279] Remove O(n^2) operation from scheduler. This commit removes an unnecessary duplicate check in addPendingTask that meant that scheduling a task set took time proportional to (# tasks)^2. Author: Sital Kedia Closes #11167 from sitalkedia/fix_stuck_driver and squashes the following commits: 3fe1af8 [Sital Kedia] [SPARK-13279] Remove unnecessary duplicate check in addPendingTask function --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cf97877476..4b19beb43f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -114,9 +114,14 @@ private[spark] class TaskSetManager( // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put - // back at the head of the stack. They are also only cleaned up lazily; - // when a task is launched, it remains in all the pending lists except - // the one that it was launched from, but gets removed from them later. + // back at the head of the stack. These collections may contain duplicates + // for two reasons: + // (1): Tasks are only removed lazily; when a task is launched, it remains + // in all the pending lists except the one that it was launched from. + // (2): Tasks may be re-added to these lists multiple times as a result + // of failures. + // Duplicates are handled in dequeueTaskFromList, which ensures that a + // task hasn't already started running before launching it. private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] // Set of pending tasks for each host. Similar to pendingTasksForExecutor, @@ -181,9 +186,7 @@ private[spark] class TaskSetManager( private def addPendingTask(index: Int) { // Utility method that adds `index` to a list only if it's not already there def addTo(list: ArrayBuffer[Int]) { - if (!list.contains(index)) { - list += index - } + list += index } for (loc <- tasks(index).preferredLocations) { -- cgit v1.2.3