aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorAndy Sloane <asloane@tetrationanalytics.com>2016-03-09 10:25:47 +0000
committerSean Owen <sowen@cloudera.com>2016-03-09 10:25:47 +0000
commitcbff2803ef117d7cffe6f05fc1bbd395a1e9c587 (patch)
treeea2c9e175d8702754929c4e93810d71a971e828b /core/src/main/scala
parent2c5af7d4d939e18a749d33b5de2e5113aa3eff08 (diff)
downloadspark-cbff2803ef117d7cffe6f05fc1bbd395a1e9c587.tar.gz
spark-cbff2803ef117d7cffe6f05fc1bbd395a1e9c587.tar.bz2
spark-cbff2803ef117d7cffe6f05fc1bbd395a1e9c587.zip
[SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs
## What changes were proposed in this pull request? If a job is being scheduled in one thread which has a dependency on an RDD currently executing a shuffle in another thread, Spark would throw a NullPointerException. This patch synchronizes access to `mapStatuses` and skips null status entries (which are in-progress shuffle tasks). ## How was this patch tested? Our client code unit test suite, which was reliably reproducing the race condition with 10 threads, shows that this fixes it. I have not found a minimal test case to add to Spark, but I will attempt to do so if desired. The same test case was tripping up on SPARK-4454, which was fixed by making other DAGScheduler code thread-safe. shivaram srowen Author: Andy Sloane <asloane@tetrationanalytics.com> Closes #11505 from a1k0n/SPARK-13631.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala52
1 files changed, 29 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index eb2fdecc83..9cb6159790 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -376,8 +376,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* @param numReducers total number of reducers in the shuffle
* @param fractionThreshold fraction of total map output size that a location must have
* for it to be considered large.
- *
- * This method is not thread-safe.
*/
def getLocationsWithLargestOutputs(
shuffleId: Int,
@@ -386,28 +384,36 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {
- if (mapStatuses.contains(shuffleId)) {
- val statuses = mapStatuses(shuffleId)
- if (statuses.nonEmpty) {
- // HashMap to add up sizes of all blocks at the same location
- val locs = new HashMap[BlockManagerId, Long]
- var totalOutputSize = 0L
- var mapIdx = 0
- while (mapIdx < statuses.length) {
- val status = statuses(mapIdx)
- val blockSize = status.getSizeForBlock(reducerId)
- if (blockSize > 0) {
- locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
- totalOutputSize += blockSize
+ val statuses = mapStatuses.get(shuffleId).orNull
+ if (statuses != null) {
+ statuses.synchronized {
+ if (statuses.nonEmpty) {
+ // HashMap to add up sizes of all blocks at the same location
+ val locs = new HashMap[BlockManagerId, Long]
+ var totalOutputSize = 0L
+ var mapIdx = 0
+ while (mapIdx < statuses.length) {
+ val status = statuses(mapIdx)
+ // status may be null here if we are called between registerShuffle, which creates an
+ // array with null entries for each output, and registerMapOutputs, which populates it
+ // with valid status entries. This is possible if one thread schedules a job which
+ // depends on an RDD which is currently being computed by another thread.
+ if (status != null) {
+ val blockSize = status.getSizeForBlock(reducerId)
+ if (blockSize > 0) {
+ locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
+ totalOutputSize += blockSize
+ }
+ }
+ mapIdx = mapIdx + 1
+ }
+ val topLocs = locs.filter { case (loc, size) =>
+ size.toDouble / totalOutputSize >= fractionThreshold
+ }
+ // Return if we have any locations which satisfy the required threshold
+ if (topLocs.nonEmpty) {
+ return Some(topLocs.keys.toArray)
}
- mapIdx = mapIdx + 1
- }
- val topLocs = locs.filter { case (loc, size) =>
- size.toDouble / totalOutputSize >= fractionThreshold
- }
- // Return if we have any locations which satisfy the required threshold
- if (topLocs.nonEmpty) {
- return Some(topLocs.map(_._1).toArray)
}
}
}