aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan(Cloud) <cloud0fan@gmail.com>2014-06-03 13:18:20 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-03 13:18:20 -0700
commit45e9bc85db231e84a23b8d757136023eabcec13e (patch)
treee802fd86c7331d30048d9e701f7f46bea29a8485
parent6c044ed100081f1fa7c8df4bd4b58b65a61c3360 (diff)
downloadspark-45e9bc85db231e84a23b8d757136023eabcec13e.tar.gz
spark-45e9bc85db231e84a23b8d757136023eabcec13e.tar.bz2
spark-45e9bc85db231e84a23b8d757136023eabcec13e.zip
[SPARK-1912] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes #860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala22
1 files changed, 20 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6e450081dc..a41286d3e4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1015,8 +1015,26 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
- val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
- serializer.newInstance().deserializeStream(stream).asIterator
+
+ def getIterator = {
+ val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+ serializer.newInstance().deserializeStream(stream).asIterator
+ }
+
+ if (blockId.isShuffle) {
+ // Reducer may need to read many local shuffle blocks and will wrap them into Iterators
+ // at the beginning. The wrapping will cost some memory (compression instance
+ // initialization, etc.). Reducer read shuffle blocks one by one so we could do the
+ // wrapping lazily to save memory.
+ class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
+ lazy val proxy = f
+ override def hasNext: Boolean = proxy.hasNext
+ override def next(): Any = proxy.next()
+ }
+ new LazyProxyIterator(getIterator)
+ } else {
+ getIterator
+ }
}
def stop() {