diff options
author | Evan Racah <ejracah@gmail.com> | 2015-09-02 22:13:18 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-09-02 22:13:37 -0700 |
commit | f6c447f87592286a6f58aee5e0b2dc8dcb470d0c (patch) | |
tree | 54bbfbd391b0bb236f79e0d226aec61497e1aa9f | |
parent | 0985d2c30e031f80892987f7c3581d15dd210303 (diff) | |
download | spark-f6c447f87592286a6f58aee5e0b2dc8dcb470d0c.tar.gz spark-f6c447f87592286a6f58aee5e0b2dc8dcb470d0c.tar.bz2 spark-f6c447f87592286a6f58aee5e0b2dc8dcb470d0c.zip |
Removed code duplication in ShuffleBlockFetcherIterator
Added fetchUpToMaxBytes() to prevent having to update both code blocks when a change is made.
Author: Evan Racah <ejracah@gmail.com>
Closes #8514 from eracah/master.
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index a759ceb96e..0d0448feb5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator( fetchRequests ++= Utils.randomize(remoteRequests) // Send out initial requests for blocks, up to our maxBytesInFlight - while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) - } + fetchUpToMaxBytes() val numFetches = remoteRequests.size - fetchRequests.size logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) @@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator( case _ => } // Send fetch requests up to maxBytesInFlight - while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) - } + fetchUpToMaxBytes() result match { case FailureFetchResult(blockId, address, e) => @@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator( } } + private def fetchUpToMaxBytes(): Unit = { + // Send fetch requests up to maxBytesInFlight + while (fetchRequests.nonEmpty && + (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + sendRequest(fetchRequests.dequeue()) + } + } + private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = { blockId match { case ShuffleBlockId(shufId, mapId, reduceId) => |