aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-23 19:47:00 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-23 19:47:00 -0800
commit3d6e75419330d27435becfdf8cfb0b6d20d56cf8 (patch)
tree64b0bda8b2e05821dc06f9641d4e183a2ada03a6
parentff44732171730fd9e5db005062a45464a3801358 (diff)
parent0213b4032a78d621405105365119677edc663b1b (diff)
downloadspark-3d6e75419330d27435becfdf8cfb0b6d20d56cf8.tar.gz
spark-3d6e75419330d27435becfdf8cfb0b6d20d56cf8.tar.bz2
spark-3d6e75419330d27435becfdf8cfb0b6d20d56cf8.zip
Merge pull request #503 from pwendell/master
Fix bug on read-side of external sort when using Snappy. This case wasn't handled correctly and this patch fixes it.
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala10
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fb73636162..3d9b09ec33 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
- val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ val compressedStream =
+ if (shouldCompress) {
+ compressionCodec.compressedInputStream(bufferedStream)
+ } else {
+ bufferedStream
+ }
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0