aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-12-14 11:41:16 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-12-14 11:41:16 -0800
commitb82a6dd2c72d6555aeaa2b523ddf564434f5e10c (patch)
tree42cae50b2e20044a4a97231c2f70f55d42ff2f70
parent1072f970cc8da29e3ebabf746e1016a8c6e9fa7e (diff)
parentcf52d9cade9a4df32a763073f7ad981465c91072 (diff)
downloadspark-b82a6dd2c72d6555aeaa2b523ddf564434f5e10c.tar.gz
spark-b82a6dd2c72d6555aeaa2b523ddf564434f5e10c.tar.bz2
spark-b82a6dd2c72d6555aeaa2b523ddf564434f5e10c.zip
Merge pull request #332 from JoshRosen/spark-607
Add try-finally to handle MapOutputTracker timeouts
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala29
1 files changed, 17 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 50c4183c0e..70eb9f702e 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -148,18 +148,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
val host = System.getProperty("spark.hostname", Utils.localHostName)
- val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
- val fetchedStatuses = deserializeStatuses(fetchedBytes)
-
- logInfo("Got the output locations")
- mapStatuses.put(shuffleId, fetchedStatuses)
- fetching.synchronized {
- fetching -= shuffleId
- fetching.notifyAll()
- }
- if (fetchedStatuses.contains(null)) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
+ // This try-finally prevents hangs due to timeouts:
+ var fetchedStatuses: Array[MapStatus] = null
+ try {
+ val fetchedBytes =
+ askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
+ fetchedStatuses = deserializeStatuses(fetchedBytes)
+ logInfo("Got the output locations")
+ mapStatuses.put(shuffleId, fetchedStatuses)
+ if (fetchedStatuses.contains(null)) {
+ throw new FetchFailedException(null, shuffleId, -1, reduceId,
+ new Exception("Missing an output location for shuffle " + shuffleId))
+ }
+ } finally {
+ fetching.synchronized {
+ fetching -= shuffleId
+ fetching.notifyAll()
+ }
}
return fetchedStatuses.map(s =>
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))