aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala10
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fb73636162..3d9b09ec33 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
- val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ val compressedStream =
+ if (shouldCompress) {
+ compressionCodec.compressedInputStream(bufferedStream)
+ } else {
+ bufferedStream
+ }
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0