aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Roberts <aroberts@uk.ibm.com>2015-11-04 14:03:31 -0800
committerReynold Xin <rxin@databricks.com>2015-11-04 14:03:31 -0800
commit701fb5052080fa8c0a79ad7c1e65693ccf444787 (patch)
tree239ed6770b744d3dc52a9d675b42cbfd090bc3bc
parentd19f4fda63d0800a85b3e1c8379160bbbf17b6a3 (diff)
downloadspark-701fb5052080fa8c0a79ad7c1e65693ccf444787.tar.gz
spark-701fb5052080fa8c0a79ad7c1e65693ccf444787.tar.bz2
spark-701fb5052080fa8c0a79ad7c1e65693ccf444787.zip
[SPARK-10949] Update Snappy version to 1.1.2
This is an updated version of #8995 by a-roberts. Original description follows: Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test. Snappy 1.1.2 changelog mentions: > snappy-java-1.1.2 (22 September 2015) > This is a backward compatible release for 1.1.x. > Add AIX (32-bit) support. > There is no upgrade for the native libraries of the other platforms. > A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s) > snappy-java-1.1.2-RC2 (18 May 2015) > Fix #107: SnappyOutputStream.close() is not idempotent > snappy-java-1.1.2-RC1 (13 May 2015) > SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream > There has been no compressed format change since 1.0.5.x. So You can read the compressed results > interchangeablly between these versions. > Fixes a problem when java.io.tmpdir does not exist. Closes #8995. Author: Adam Roberts <aroberts@uk.ibm.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #9439 from JoshRosen/update-snappy.
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java4
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala6
-rw-r--r--pom.xml2
4 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index e19b378642..6a0a89e81c 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -254,8 +254,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
- final boolean fastMergeIsSupported =
- !compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
+ final boolean fastMergeIsSupported = !compressionEnabled ||
+ CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // Create an empty file
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 9dc36704a6..ca74eedf89 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -47,6 +47,11 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
private val configKey = "spark.io.compression.codec"
+
+ private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
+ codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
+ }
+
private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index cbdb33c89d..1553ab60bd 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -100,12 +100,10 @@ class CompressionCodecSuite extends SparkFunSuite {
testCodec(codec)
}
- test("snappy does not support concatenation of serialized streams") {
+ test("snappy supports concatenation of serialized streams") {
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
- intercept[Exception] {
- testConcatenationOfSerializedStreams(codec)
- }
+ testConcatenationOfSerializedStreams(codec)
}
test("bad compression codec") {
diff --git a/pom.xml b/pom.xml
index 762bfc7282..f5a3e44fc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,7 +165,7 @@
<jline.groupid>org.scala-lang</jline.groupid>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
- <snappy.version>1.1.1.7</snappy.version>
+ <snappy.version>1.1.2</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version>
<calcite.version>1.2.0-incubating</calcite.version>
<commons-codec.version>1.10</commons-codec.version>