aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-08-11 19:15:01 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-11 19:15:01 -0700
commit7712e724ad69dd0b83754e938e9799d13a4d43b9 (patch)
tree74f2c4f7f9d31d3f2b7f641c8bb4b5667e10f3d9 /core/src/main
parent37338666655909502e424b4639d680271d6d4c12 (diff)
downloadspark-7712e724ad69dd0b83754e938e9799d13a4d43b9.tar.gz
spark-7712e724ad69dd0b83754e938e9799d13a4d43b9.tar.bz2
spark-7712e724ad69dd0b83754e938e9799d13a4d43b9.zip
[SPARK-2931] In TaskSetManager, reset currentLocalityIndex after recomputing locality levels
This addresses SPARK-2931, a bug where getAllowedLocalityLevel() could throw ArrayIndexOutOfBoundsException. The fix here is to reset currentLocalityIndex after recomputing the locality levels. Thanks to kayousterhout, mridulm, and lirui-intel for helping me to debug this. Author: Josh Rosen <joshrosen@apache.org> Closes #1896 from JoshRosen/SPARK-2931 and squashes the following commits: 48b60b5 [Josh Rosen] Move FakeRackUtil.cleanUp() info beforeEach(). 6fec474 [Josh Rosen] Set currentLocalityIndex after recomputing locality levels. 9384897 [Josh Rosen] Update SPARK-2931 test to reflect changes in 63bdb1f41b4895e3a9444f7938094438a94d3007. 9ecd455 [Josh Rosen] Apply @mridulm's patch for reproducing SPARK-2931.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala11
1 files changed, 8 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 20a4bd12f9..d9d53faf84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -690,8 +690,7 @@ private[spark] class TaskSetManager(
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
}
// recalculate valid locality levels and waits when executor is lost
- myLocalityLevels = computeValidLocalityLevels()
- localityWaits = myLocalityLevels.map(getLocalityWait)
+ recomputeLocality()
}
/**
@@ -775,9 +774,15 @@ private[spark] class TaskSetManager(
levels.toArray
}
- def executorAdded() {
+ def recomputeLocality() {
+ val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
+ currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
+ }
+
+ def executorAdded() {
+ recomputeLocality()
}
}