diff options
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index c61fd75c2b..2462721fb8 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -513,7 +513,7 @@ class BlockManager( } } - // Split local and remote blocks. Remote blocks are further split into FetchRequests of size + // Partition local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] for ((address, blockInfos) <- blocksByAddress) { @@ -585,7 +585,7 @@ class BlockManager( resultsGotten += 1 val result = results.take() bytesInFlight -= result.size - if (!fetchRequests.isEmpty && + while (!fetchRequests.isEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) } @@ -950,6 +950,7 @@ class BlockManager( blockInfo.clear() memoryStore.clear() diskStore.clear() + metadataCleaner.cancel() logInfo("BlockManager stopped") } } |