aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala22
1 files changed, 20 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6e450081dc..a41286d3e4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1015,8 +1015,26 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
- val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
- serializer.newInstance().deserializeStream(stream).asIterator
+
+ def getIterator = {
+ val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+ serializer.newInstance().deserializeStream(stream).asIterator
+ }
+
+ if (blockId.isShuffle) {
+ // Reducer may need to read many local shuffle blocks and will wrap them into Iterators
+ // at the beginning. The wrapping will cost some memory (compression instance
+ // initialization, etc.). Reducer read shuffle blocks one by one so we could do the
+ // wrapping lazily to save memory.
+ class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
+ lazy val proxy = f
+ override def hasNext: Boolean = proxy.hasNext
+ override def next(): Any = proxy.next()
+ }
+ new LazyProxyIterator(getIterator)
+ } else {
+ getIterator
+ }
}
def stop() {