From 0213b4032a78d621405105365119677edc663b1b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 23 Jan 2014 17:59:42 -0800 Subject: Fix bug on read-side of external sort when using Snappy. This case wasn't handled correctly and this patch fixes it. --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 64e9b436f0..eb3acf4dbe 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -331,7 +331,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) - val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + val compressedStream = + if (shouldCompress) { + compressionCodec.compressedInputStream(bufferedStream) + } else { + bufferedStream + } var deserializeStream = ser.deserializeStream(compressedStream) var objectsRead = 0 -- cgit v1.2.3