aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjinxing <jinxing6042@126.com>2017-03-01 21:15:22 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-03-01 21:15:22 -0800
commit51be633657800d470de5dcebbed09e6bf08f6e2a (patch)
tree078cb55791ba471fe4a7d45ccb24612f79b04e96
parentdb0ddce523bb823cba996e92ef36ceca31492d2c (diff)
downloadspark-51be633657800d470de5dcebbed09e6bf08f6e2a.tar.gz
spark-51be633657800d470de5dcebbed09e6bf08f6e2a.tar.bz2
spark-51be633657800d470de5dcebbed09e6bf08f6e2a.zip
[SPARK-19777] Scan runningTasksSet when check speculatable tasks in TaskSetManager.
## What changes were proposed in this pull request? When check speculatable tasks in `TaskSetManager`, only scan `runningTasksSet` instead of scanning all `taskInfos`. ## How was this patch tested? Existing tests. Author: jinxing <jinxing6042@126.com> Closes #17111 from jinxing64/SPARK-19777.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala5
1 files changed, 2 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3b25513bea..e63feb8893 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -906,8 +906,6 @@ private[spark] class TaskSetManager(
* Check for tasks to be speculated and return true if there are any. This is called periodically
* by the TaskScheduler.
*
- * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
- * we don't scan the whole task set. It might also help to make this sorted by launch time.
*/
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
// Can't speculate if we only have one task, and no need to speculate if the task set is a
@@ -927,7 +925,8 @@ private[spark] class TaskSetManager(
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
- for ((tid, info) <- taskInfos) {
+ for (tid <- runningTasksSet) {
+ val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {