diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index fb73636162..3d9b09ec33 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) - val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + val compressedStream = + if (shouldCompress) { + compressionCodec.compressedInputStream(bufferedStream) + } else { + bufferedStream + } var deserializeStream = ser.deserializeStream(compressedStream) var objectsRead = 0 |