diff options
Diffstat (limited to 'core/src/main/scala/spark/BlockStoreShuffleFetcher.scala')
-rw-r--r-- | core/src/main/scala/spark/BlockStoreShuffleFetcher.scala | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index fb65ba421a..4554db2249 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -17,18 +17,18 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis - val addresses = SparkEnv.get.mapOutputTracker.getServerAddresses(shuffleId) + val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) - val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[Int]] - for ((address, index) <- addresses.zipWithIndex) { - splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += index + val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] + for (((address, size), index) <- statuses.zipWithIndex) { + splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) } - val blocksByAddress: Seq[(BlockManagerId, Seq[String])] = splitsByAddress.toSeq.map { + val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map { case (address, splits) => - (address, splits.map(i => "shuffle_%d_%d_%d".format(shuffleId, i, reduceId))) + (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2))) } for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { @@ -43,9 +43,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin case None => { val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r blockId match { - case regex(shufId, mapId, reduceId) => - val addr = addresses(mapId.toInt) - throw new FetchFailedException(addr, shufId.toInt, mapId.toInt, reduceId.toInt, null) + case regex(shufId, mapId, _) => + val address = statuses(mapId.toInt)._1 + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block") |