From 51be633657800d470de5dcebbed09e6bf08f6e2a Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Mar 2017 21:15:22 -0800 Subject: [SPARK-19777] Scan runningTasksSet when check speculatable tasks in TaskSetManager. ## What changes were proposed in this pull request? When check speculatable tasks in `TaskSetManager`, only scan `runningTasksSet` instead of scanning all `taskInfos`. ## How was this patch tested? Existing tests. Author: jinxing Closes #17111 from jinxing64/SPARK-19777. --- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513bea..e63feb8893 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -906,8 +906,6 @@ private[spark] class TaskSetManager( * Check for tasks to be speculated and return true if there are any. This is called periodically * by the TaskScheduler. * - * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that - * we don't scan the whole task set. It might also help to make this sorted by launch time. */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a @@ -927,7 +925,8 @@ private[spark] class TaskSetManager( // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) - for ((tid, info) <- taskInfos) { + for (tid <- runningTasksSet) { + val info = taskInfos(tid) val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { -- cgit v1.2.3