diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-23 17:59:42 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-23 18:04:55 -0800 |
commit | 0213b4032a78d621405105365119677edc663b1b (patch) | |
tree | 5592ba2d729e0a156fbc1671934eb9a088ac4ad3 | |
parent | 6285513147c78b6bedc9ea21e6f4644b1a71e8f4 (diff) | |
download | spark-0213b4032a78d621405105365119677edc663b1b.tar.gz spark-0213b4032a78d621405105365119677edc663b1b.tar.bz2 spark-0213b4032a78d621405105365119677edc663b1b.zip |
Fix bug on read-side of external sort when using Snappy.
This case wasn't handled correctly and this patch fixes it.
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 64e9b436f0..eb3acf4dbe 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -331,7 +331,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) - val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + val compressedStream = + if (shouldCompress) { + compressionCodec.compressedInputStream(bufferedStream) + } else { + bufferedStream + } var deserializeStream = ser.deserializeStream(compressedStream) var objectsRead = 0 |