diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-23 19:47:00 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-23 19:47:00 -0800 |
commit | 3d6e75419330d27435becfdf8cfb0b6d20d56cf8 (patch) | |
tree | 64b0bda8b2e05821dc06f9641d4e183a2ada03a6 | |
parent | ff44732171730fd9e5db005062a45464a3801358 (diff) | |
parent | 0213b4032a78d621405105365119677edc663b1b (diff) | |
download | spark-3d6e75419330d27435becfdf8cfb0b6d20d56cf8.tar.gz spark-3d6e75419330d27435becfdf8cfb0b6d20d56cf8.tar.bz2 spark-3d6e75419330d27435becfdf8cfb0b6d20d56cf8.zip |
Merge pull request #503 from pwendell/master
Fix bug on read-side of external sort when using Snappy.
This case wasn't handled correctly and this patch fixes it.
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index fb73636162..3d9b09ec33 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) - val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + val compressedStream = + if (shouldCompress) { + compressionCodec.compressedInputStream(bufferedStream) + } else { + bufferedStream + } var deserializeStream = ser.deserializeStream(compressedStream) var objectsRead = 0 |