diff options
author | Adam Roberts <aroberts@uk.ibm.com> | 2015-11-04 14:03:31 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-04 14:03:31 -0800 |
commit | 701fb5052080fa8c0a79ad7c1e65693ccf444787 (patch) | |
tree | 239ed6770b744d3dc52a9d675b42cbfd090bc3bc | |
parent | d19f4fda63d0800a85b3e1c8379160bbbf17b6a3 (diff) | |
download | spark-701fb5052080fa8c0a79ad7c1e65693ccf444787.tar.gz spark-701fb5052080fa8c0a79ad7c1e65693ccf444787.tar.bz2 spark-701fb5052080fa8c0a79ad7c1e65693ccf444787.zip |
[SPARK-10949] Update Snappy version to 1.1.2
This is an updated version of #8995 by a-roberts. Original description follows:
Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test.
Snappy 1.1.2 changelog mentions:
> snappy-java-1.1.2 (22 September 2015)
> This is a backward compatible release for 1.1.x.
> Add AIX (32-bit) support.
> There is no upgrade for the native libraries of the other platforms.
> A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s)
> snappy-java-1.1.2-RC2 (18 May 2015)
> Fix #107: SnappyOutputStream.close() is not idempotent
> snappy-java-1.1.2-RC1 (13 May 2015)
> SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream
> There has been no compressed format change since 1.0.5.x. So You can read the compressed results > interchangeablly between these versions.
> Fixes a problem when java.io.tmpdir does not exist.
Closes #8995.
Author: Adam Roberts <aroberts@uk.ibm.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes #9439 from JoshRosen/update-snappy.
4 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index e19b378642..6a0a89e81c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -254,8 +254,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true); - final boolean fastMergeIsSupported = - !compressionEnabled || compressionCodec instanceof LZFCompressionCodec; + final boolean fastMergeIsSupported = !compressionEnabled || + CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); try { if (spills.length == 0) { new FileOutputStream(outputFile).close(); // Create an empty file diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 9dc36704a6..ca74eedf89 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -47,6 +47,11 @@ trait CompressionCodec { private[spark] object CompressionCodec { private val configKey = "spark.io.compression.codec" + + private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { + codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] + } + private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index cbdb33c89d..1553ab60bd 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -100,12 +100,10 @@ class CompressionCodecSuite extends SparkFunSuite { testCodec(codec) } - test("snappy does not support concatenation of serialized streams") { + test("snappy supports concatenation of serialized streams") { val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName) assert(codec.getClass === classOf[SnappyCompressionCodec]) - intercept[Exception] { - testConcatenationOfSerializedStreams(codec) - } + testConcatenationOfSerializedStreams(codec) } test("bad compression codec") { @@ -165,7 +165,7 @@ <jline.groupid>org.scala-lang</jline.groupid> <codehaus.jackson.version>1.9.13</codehaus.jackson.version> <fasterxml.jackson.version>2.4.4</fasterxml.jackson.version> - <snappy.version>1.1.1.7</snappy.version> + <snappy.version>1.1.2</snappy.version> <netlib.java.version>1.1.2</netlib.java.version> <calcite.version>1.2.0-incubating</calcite.version> <commons-codec.version>1.10</commons-codec.version> |