aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-28 19:00:40 -0700
committerReynold Xin <rxin@apache.org>2014-08-28 19:00:40 -0700
commit665e71d14debb8a7fc1547c614867a8c3b1f806a (patch)
tree29f7dd70cd4cc5cbc053e3dd099808699df30115 /core
parent3c517a812e2e7cb5c00897b48024a716cc2b9a40 (diff)
downloadspark-665e71d14debb8a7fc1547c614867a8c3b1f806a.tar.gz
spark-665e71d14debb8a7fc1547c614867a8c3b1f806a.tar.bz2
spark-665e71d14debb8a7fc1547c614867a8c3b1f806a.zip
[SPARK-1912] Lazily initialize buffers for local shuffle blocks.
This is a simplified fix for SPARK-1912. Author: Reynold Xin <rxin@apache.org> Closes #2179 from rxin/SPARK-1912 and squashes the following commits: b2f0e9e [Reynold Xin] Fix unit tests. a8eddfe [Reynold Xin] [SPARK-1912] Lazily initialize buffers for local shuffle blocks.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala28
3 files changed, 20 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4ab8ec8f0f..d07e6a1b18 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -196,11 +196,8 @@ object BlockFetcherIterator {
// any memory that might exceed our maxBytesInFlight
for (id <- localBlocksToFetch) {
try {
- // getLocalFromDisk never return None but throws BlockException
- val iter = getLocalFromDisk(id, serializer).get
- // Pass 0 as size since it's not in flight
readMetrics.localBlocksFetched += 1
- results.put(new FetchResult(id, 0, () => iter))
+ results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
logDebug("Got local block " + id)
} catch {
case e: Exception => {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1eb622c12a..cfe5b6c50a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1039,26 +1039,8 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
-
- def getIterator: Iterator[Any] = {
- val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
- serializer.newInstance().deserializeStream(stream).asIterator
- }
-
- if (blockId.isShuffle) {
- /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
- * at the beginning. The wrapping will cost some memory (compression instance
- * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
- * wrapping lazily to save memory. */
- class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
- lazy val proxy = f
- override def hasNext: Boolean = proxy.hasNext
- override def next(): Any = proxy.next()
- }
- new LazyProxyIterator(getIterator)
- } else {
- getIterator
- }
+ val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+ serializer.newInstance().deserializeStream(stream).asIterator
}
def stop(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index 1591284383..fbfcb5156d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
iterator.initialize()
- // 3rd getLocalFromDisk invocation should be failed
- verify(blockManager, times(3)).getLocalFromDisk(any(), any())
+ // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
+ verify(blockManager, times(0)).getLocalFromDisk(any(), any())
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
// the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully
- assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined")
+ assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined")
+ verify(blockManager, times(1)).getLocalFromDisk(any(), any())
+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
- assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined")
+ assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined")
+ verify(blockManager, times(2)).getLocalFromDisk(any(), any())
+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
// 3rd fetch should be failed
- assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined")
- assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
- // Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator.
- // Otherwise, BasicBlockFetcherIterator hangs up.
+ intercept[Exception] {
+ iterator.next()
+ }
+ verify(blockManager, times(3)).getLocalFromDisk(any(), any())
}
@@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
iterator.initialize()
- // getLocalFromDis should be invoked for all of 5 blocks
- verify(blockManager, times(5)).getLocalFromDisk(any(), any())
+ // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
+ verify(blockManager, times(0)).getLocalFromDisk(any(), any())
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined")
@@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined")
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
- assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
+ assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
+
+ verify(blockManager, times(5)).getLocalFromDisk(any(), any())
}
test("block fetch from remote fails using BasicBlockFetcherIterator") {