diff options
author | Reynold Xin <rxin@apache.org> | 2014-08-28 19:00:40 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-08-28 19:00:40 -0700 |
commit | 665e71d14debb8a7fc1547c614867a8c3b1f806a (patch) | |
tree | 29f7dd70cd4cc5cbc053e3dd099808699df30115 | |
parent | 3c517a812e2e7cb5c00897b48024a716cc2b9a40 (diff) | |
download | spark-665e71d14debb8a7fc1547c614867a8c3b1f806a.tar.gz spark-665e71d14debb8a7fc1547c614867a8c3b1f806a.tar.bz2 spark-665e71d14debb8a7fc1547c614867a8c3b1f806a.zip |
[SPARK-1912] Lazily initialize buffers for local shuffle blocks.
This is a simplified fix for SPARK-1912.
Author: Reynold Xin <rxin@apache.org>
Closes #2179 from rxin/SPARK-1912 and squashes the following commits:
b2f0e9e [Reynold Xin] Fix unit tests.
a8eddfe [Reynold Xin] [SPARK-1912] Lazily initialize buffers for local shuffle blocks.
3 files changed, 20 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 4ab8ec8f0f..d07e6a1b18 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -196,11 +196,8 @@ object BlockFetcherIterator { // any memory that might exceed our maxBytesInFlight for (id <- localBlocksToFetch) { try { - // getLocalFromDisk never return None but throws BlockException - val iter = getLocalFromDisk(id, serializer).get - // Pass 0 as size since it's not in flight readMetrics.localBlocksFetched += 1 - results.put(new FetchResult(id, 0, () => iter)) + results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get)) logDebug("Got local block " + id) } catch { case e: Exception => { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1eb622c12a..cfe5b6c50a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1039,26 +1039,8 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - - def getIterator: Iterator[Any] = { - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator - } - - if (blockId.isShuffle) { - /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators - * at the beginning. The wrapping will cost some memory (compression instance - * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the - * wrapping lazily to save memory. */ - class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { - lazy val proxy = f - override def hasNext: Boolean = proxy.hasNext - override def next(): Any = proxy.next() - } - new LazyProxyIterator(getIterator) - } else { - getIterator - } + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator } def stop(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index 1591284383..fbfcb5156d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // 3rd getLocalFromDisk invocation should be failed - verify(blockManager, times(3)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. + verify(blockManager, times(0)).getLocalFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully - assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined") + assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined") + verify(blockManager, times(1)).getLocalFromDisk(any(), any()) + assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") - assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined") + assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined") + verify(blockManager, times(2)).getLocalFromDisk(any(), any()) + assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") // 3rd fetch should be failed - assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined") - assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements") - // Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator. - // Otherwise, BasicBlockFetcherIterator hangs up. + intercept[Exception] { + iterator.next() + } + verify(blockManager, times(3)).getLocalFromDisk(any(), any()) } @@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // getLocalFromDis should be invoked for all of 5 blocks - verify(blockManager, times(5)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. + verify(blockManager, times(0)).getLocalFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") @@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements") assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined") assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements") - assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") + assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") + + verify(blockManager, times(5)).getLocalFromDisk(any(), any()) } test("block fetch from remote fails using BasicBlockFetcherIterator") { |