aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-23 17:59:42 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-23 18:04:55 -0800
commit0213b4032a78d621405105365119677edc663b1b (patch)
tree5592ba2d729e0a156fbc1671934eb9a088ac4ad3
parent6285513147c78b6bedc9ea21e6f4644b1a71e8f4 (diff)
downloadspark-0213b4032a78d621405105365119677edc663b1b.tar.gz
spark-0213b4032a78d621405105365119677edc663b1b.tar.bz2
spark-0213b4032a78d621405105365119677edc663b1b.zip
Fix bug on read-side of external sort when using Snappy.
This case wasn't handled correctly and this patch fixes it.
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala10
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b436f0..eb3acf4dbe 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -331,7 +331,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
- val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ val compressedStream =
+ if (shouldCompress) {
+ compressionCodec.compressedInputStream(bufferedStream)
+ } else {
+ bufferedStream
+ }
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0