diff options
author | Andy Sloane <asloane@tetrationanalytics.com> | 2016-03-09 10:25:47 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-09 10:25:47 +0000 |
commit | cbff2803ef117d7cffe6f05fc1bbd395a1e9c587 (patch) | |
tree | ea2c9e175d8702754929c4e93810d71a971e828b /core | |
parent | 2c5af7d4d939e18a749d33b5de2e5113aa3eff08 (diff) | |
download | spark-cbff2803ef117d7cffe6f05fc1bbd395a1e9c587.tar.gz spark-cbff2803ef117d7cffe6f05fc1bbd395a1e9c587.tar.bz2 spark-cbff2803ef117d7cffe6f05fc1bbd395a1e9c587.zip |
[SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs
## What changes were proposed in this pull request?
If a job is being scheduled in one thread which has a dependency on an
RDD currently executing a shuffle in another thread, Spark would throw a
NullPointerException. This patch synchronizes access to `mapStatuses` and
skips null status entries (which are in-progress shuffle tasks).
## How was this patch tested?
Our client code unit test suite, which was reliably reproducing the race
condition with 10 threads, shows that this fixes it. I have not found a minimal
test case to add to Spark, but I will attempt to do so if desired.
The same test case was tripping up on SPARK-4454, which was fixed by
making other DAGScheduler code thread-safe.
shivaram srowen
Author: Andy Sloane <asloane@tetrationanalytics.com>
Closes #11505 from a1k0n/SPARK-13631.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index eb2fdecc83..9cb6159790 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -376,8 +376,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * @param numReducers total number of reducers in the shuffle * @param fractionThreshold fraction of total map output size that a location must have * for it to be considered large. - * - * This method is not thread-safe. */ def getLocationsWithLargestOutputs( shuffleId: Int, @@ -386,28 +384,36 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) fractionThreshold: Double) : Option[Array[BlockManagerId]] = { - if (mapStatuses.contains(shuffleId)) { - val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty) { - // HashMap to add up sizes of all blocks at the same location - val locs = new HashMap[BlockManagerId, Long] - var totalOutputSize = 0L - var mapIdx = 0 - while (mapIdx < statuses.length) { - val status = statuses(mapIdx) - val blockSize = status.getSizeForBlock(reducerId) - if (blockSize > 0) { - locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize - totalOutputSize += blockSize + val statuses = mapStatuses.get(shuffleId).orNull + if (statuses != null) { + statuses.synchronized { + if (statuses.nonEmpty) { + // HashMap to add up sizes of all blocks at the same location + val locs = new HashMap[BlockManagerId, Long] + var totalOutputSize = 0L + var mapIdx = 0 + while (mapIdx < statuses.length) { + val status = statuses(mapIdx) + // status may be null here if we are called between registerShuffle, which creates an + // array with null entries for each output, and registerMapOutputs, which populates it + // with valid status entries. This is possible if one thread schedules a job which + // depends on an RDD which is currently being computed by another thread. + if (status != null) { + val blockSize = status.getSizeForBlock(reducerId) + if (blockSize > 0) { + locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize + totalOutputSize += blockSize + } + } + mapIdx = mapIdx + 1 + } + val topLocs = locs.filter { case (loc, size) => + size.toDouble / totalOutputSize >= fractionThreshold + } + // Return if we have any locations which satisfy the required threshold + if (topLocs.nonEmpty) { + return Some(topLocs.keys.toArray) } - mapIdx = mapIdx + 1 - } - val topLocs = locs.filter { case (loc, size) => - size.toDouble / totalOutputSize >= fractionThreshold - } - // Return if we have any locations which satisfy the required threshold - if (topLocs.nonEmpty) { - return Some(topLocs.map(_._1).toArray) } } } |