diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-12-14 11:41:16 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-12-14 11:41:16 -0800 |
commit | b82a6dd2c72d6555aeaa2b523ddf564434f5e10c (patch) | |
tree | 42cae50b2e20044a4a97231c2f70f55d42ff2f70 /core | |
parent | 1072f970cc8da29e3ebabf746e1016a8c6e9fa7e (diff) | |
parent | cf52d9cade9a4df32a763073f7ad981465c91072 (diff) | |
download | spark-b82a6dd2c72d6555aeaa2b523ddf564434f5e10c.tar.gz spark-b82a6dd2c72d6555aeaa2b523ddf564434f5e10c.tar.bz2 spark-b82a6dd2c72d6555aeaa2b523ddf564434f5e10c.zip |
Merge pull request #332 from JoshRosen/spark-607
Add try-finally to handle MapOutputTracker timeouts
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/MapOutputTracker.scala | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 50c4183c0e..70eb9f702e 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -148,18 +148,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) val host = System.getProperty("spark.hostname", Utils.localHostName) - val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]] - val fetchedStatuses = deserializeStatuses(fetchedBytes) - - logInfo("Got the output locations") - mapStatuses.put(shuffleId, fetchedStatuses) - fetching.synchronized { - fetching -= shuffleId - fetching.notifyAll() - } - if (fetchedStatuses.contains(null)) { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing an output location for shuffle " + shuffleId)) + // This try-finally prevents hangs due to timeouts: + var fetchedStatuses: Array[MapStatus] = null + try { + val fetchedBytes = + askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]] + fetchedStatuses = deserializeStatuses(fetchedBytes) + logInfo("Got the output locations") + mapStatuses.put(shuffleId, fetchedStatuses) + if (fetchedStatuses.contains(null)) { + throw new FetchFailedException(null, shuffleId, -1, reduceId, + new Exception("Missing an output location for shuffle " + shuffleId)) + } + } finally { + fetching.synchronized { + fetching -= shuffleId + fetching.notifyAll() + } } return fetchedStatuses.map(s => (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) |