diff options
author | Josh Rosen <joshrosen@apache.org> | 2014-08-11 19:15:01 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-08-11 19:15:01 -0700 |
commit | 7712e724ad69dd0b83754e938e9799d13a4d43b9 (patch) | |
tree | 74f2c4f7f9d31d3f2b7f641c8bb4b5667e10f3d9 /core/src/main | |
parent | 37338666655909502e424b4639d680271d6d4c12 (diff) | |
download | spark-7712e724ad69dd0b83754e938e9799d13a4d43b9.tar.gz spark-7712e724ad69dd0b83754e938e9799d13a4d43b9.tar.bz2 spark-7712e724ad69dd0b83754e938e9799d13a4d43b9.zip |
[SPARK-2931] In TaskSetManager, reset currentLocalityIndex after recomputing locality levels
This addresses SPARK-2931, a bug where getAllowedLocalityLevel() could throw ArrayIndexOutOfBoundsException. The fix here is to reset currentLocalityIndex after recomputing the locality levels.
Thanks to kayousterhout, mridulm, and lirui-intel for helping me to debug this.
Author: Josh Rosen <joshrosen@apache.org>
Closes #1896 from JoshRosen/SPARK-2931 and squashes the following commits:
48b60b5 [Josh Rosen] Move FakeRackUtil.cleanUp() info beforeEach().
6fec474 [Josh Rosen] Set currentLocalityIndex after recomputing locality levels.
9384897 [Josh Rosen] Update SPARK-2931 test to reflect changes in 63bdb1f41b4895e3a9444f7938094438a94d3007.
9ecd455 [Josh Rosen] Apply @mridulm's patch for reproducing SPARK-2931.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 20a4bd12f9..d9d53faf84 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -690,8 +690,7 @@ private[spark] class TaskSetManager( handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure) } // recalculate valid locality levels and waits when executor is lost - myLocalityLevels = computeValidLocalityLevels() - localityWaits = myLocalityLevels.map(getLocalityWait) + recomputeLocality() } /** @@ -775,9 +774,15 @@ private[spark] class TaskSetManager( levels.toArray } - def executorAdded() { + def recomputeLocality() { + val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) myLocalityLevels = computeValidLocalityLevels() localityWaits = myLocalityLevels.map(getLocalityWait) + currentLocalityIndex = getLocalityIndex(previousLocalityLevel) + } + + def executorAdded() { + recomputeLocality() } } |