From 38703bbca86003995f32b2e948ad7c7c358aa99a Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 15 Dec 2014 14:51:15 -0800 Subject: [SPARK-1037] The name of findTaskFromList & findTask in TaskSetManager.scala is confusing Hi all - I've renamed the methods referenced in this JIRA to clarify that they modify the provided arrays (find vs. deque). Author: Ilya Ganelin Closes #3665 from ilganeli/SPARK-1037B and squashes the following commits: 64c177c [Ilya Ganelin] Renamed deque to dequeue f27d85e [Ilya Ganelin] Renamed private methods to clarify that they modify the provided parameters 683482a [Ilya Ganelin] Renamed private methods to clarify that they modify the provided parameters --- .../apache/spark/scheduler/TaskSetManager.scala | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cabdc655f8..28e6147509 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -249,7 +249,7 @@ private[spark] class TaskSetManager( * This method also cleans up any tasks in the list that have already * been launched, since we want that to happen lazily. */ - private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = { + private def dequeueTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = { var indexOffset = list.size while (indexOffset > 0) { indexOffset -= 1 @@ -290,7 +290,7 @@ private[spark] class TaskSetManager( * an attempt running on this host, in case the host is slow. In addition, the task should meet * the given locality constraint. */ - private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) + private def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set @@ -366,22 +366,22 @@ private[spark] class TaskSetManager( * * @return An option containing (task index within the task set, locality, is speculative?) */ - private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value) + private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { - for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { + for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { - for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { + for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic - for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { + for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } } @@ -389,20 +389,20 @@ private[spark] class TaskSetManager( if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) - index <- findTaskFromList(execId, getPendingTasksForRack(rack)) + index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { - for (index <- findTaskFromList(execId, allPendingTasks)) { + for (index <- dequeueTaskFromList(execId, allPendingTasks)) { return Some((index, TaskLocality.ANY, false)) } } // find a speculative task if all others tasks have been scheduled - findSpeculativeTask(execId, host, maxLocality).map { + dequeueSpeculativeTask(execId, host, maxLocality).map { case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} } @@ -436,7 +436,7 @@ private[spark] class TaskSetManager( } } - findTask(execId, host, allowedLocality) match { + dequeueTask(execId, host, allowedLocality) match { case Some((index, taskLocality, speculative)) => { // Found a task; do some bookkeeping and return a task description val task = tasks(index) @@ -704,7 +704,7 @@ private[spark] class TaskSetManager( // Re-enqueue pending tasks for this host based on the status of the cluster. Note // that it's okay if we add a task to the same queue twice (if it had multiple preferred - // locations), because findTaskFromList will skip already-running tasks. + // locations), because dequeueTaskFromList will skip already-running tasks. for (index <- getPendingTasksForExecutor(execId)) { addPendingTask(index, readding=true) } -- cgit v1.2.3