aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEvan Racah <ejracah@gmail.com>2015-09-02 22:13:18 -0700
committerAndrew Or <andrew@databricks.com>2015-09-02 22:13:37 -0700
commitf6c447f87592286a6f58aee5e0b2dc8dcb470d0c (patch)
tree54bbfbd391b0bb236f79e0d226aec61497e1aa9f /core
parent0985d2c30e031f80892987f7c3581d15dd210303 (diff)
downloadspark-f6c447f87592286a6f58aee5e0b2dc8dcb470d0c.tar.gz
spark-f6c447f87592286a6f58aee5e0b2dc8dcb470d0c.tar.bz2
spark-f6c447f87592286a6f58aee5e0b2dc8dcb470d0c.zip
Removed code duplication in ShuffleBlockFetcherIterator
Added fetchUpToMaxBytes() to prevent having to update both code blocks when a change is made. Author: Evan Racah <ejracah@gmail.com> Closes #8514 from eracah/master.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index a759ceb96e..0d0448feb5 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator(
fetchRequests ++= Utils.randomize(remoteRequests)
// Send out initial requests for blocks, up to our maxBytesInFlight
- while (fetchRequests.nonEmpty &&
- (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
- sendRequest(fetchRequests.dequeue())
- }
+ fetchUpToMaxBytes()
val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
@@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator(
case _ =>
}
// Send fetch requests up to maxBytesInFlight
- while (fetchRequests.nonEmpty &&
- (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
- sendRequest(fetchRequests.dequeue())
- }
+ fetchUpToMaxBytes()
result match {
case FailureFetchResult(blockId, address, e) =>
@@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator(
}
}
+ private def fetchUpToMaxBytes(): Unit = {
+ // Send fetch requests up to maxBytesInFlight
+ while (fetchRequests.nonEmpty &&
+ (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
+ sendRequest(fetchRequests.dequeue())
+ }
+ }
+
private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = {
blockId match {
case ShuffleBlockId(shufId, mapId, reduceId) =>