aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-12-21 14:21:43 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-21 14:21:43 -0800
commit29cecd4a42f6969613e5b2a40f2724f99e7eec01 (patch)
tree261b40272c991649ce584b7cec2056dcd2b6cf1c /core/src
parentd655d37ddf59d7fb6db529324ac8044d53b2622a (diff)
downloadspark-29cecd4a42f6969613e5b2a40f2724f99e7eec01.tar.gz
spark-29cecd4a42f6969613e5b2a40f2724f99e7eec01.tar.bz2
spark-29cecd4a42f6969613e5b2a40f2724f99e7eec01.zip
[SPARK-12388] change default compression to lz4
According the benchmark [1], LZ4-java could be 80% (or 30%) faster than Snappy. After changing the compressor to LZ4, I saw 20% improvement on end-to-end time for a TPCDS query (Q4). [1] https://github.com/ning/jvm-compressor-benchmark/wiki cc rxin Author: Davies Liu <davies@databricks.com> Closes #10342 from davies/lz4.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java263
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala8
3 files changed, 272 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index ca74eedf89..717804626f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,10 +17,10 @@
package org.apache.spark.io
-import java.io.{IOException, InputStream, OutputStream}
+import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import net.jpountz.lz4.LZ4BlockOutputStream
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
@@ -49,7 +49,8 @@ private[spark] object CompressionCodec {
private val configKey = "spark.io.compression.codec"
private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
- codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
+ (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
+ || codec.isInstanceOf[LZ4CompressionCodec])
}
private val shortCompressionCodecNames = Map(
@@ -92,12 +93,11 @@ private[spark] object CompressionCodec {
}
}
- val FALLBACK_COMPRESSION_CODEC = "lzf"
- val DEFAULT_COMPRESSION_CODEC = "snappy"
+ val FALLBACK_COMPRESSION_CODEC = "snappy"
+ val DEFAULT_COMPRESSION_CODEC = "lz4"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}
-
/**
* :: DeveloperApi ::
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
diff --git a/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java b/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java
new file mode 100644
index 0000000000..27b6f0d4a3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java
@@ -0,0 +1,263 @@
+package org.apache.spark.io;
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Checksum;
+
+import net.jpountz.lz4.LZ4BlockOutputStream;
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.util.SafeUtils;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * {@link InputStream} implementation to decode data written with
+ * {@link LZ4BlockOutputStream}. This class is not thread-safe and does not
+ * support {@link #mark(int)}/{@link #reset()}.
+ * @see LZ4BlockOutputStream
+ *
+ * This is based on net.jpountz.lz4.LZ4BlockInputStream
+ *
+ * changes: https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411
+ *
+ * TODO: merge this into upstream
+ */
+public final class LZ4BlockInputStream extends FilterInputStream {
+
+ // Copied from net.jpountz.lz4.LZ4BlockOutputStream
+ static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' };
+ static final int MAGIC_LENGTH = MAGIC.length;
+
+ static final int HEADER_LENGTH =
+ MAGIC_LENGTH // magic bytes
+ + 1 // token
+ + 4 // compressed length
+ + 4 // decompressed length
+ + 4; // checksum
+
+ static final int COMPRESSION_LEVEL_BASE = 10;
+
+ static final int COMPRESSION_METHOD_RAW = 0x10;
+ static final int COMPRESSION_METHOD_LZ4 = 0x20;
+
+ static final int DEFAULT_SEED = 0x9747b28c;
+
+ private final LZ4FastDecompressor decompressor;
+ private final Checksum checksum;
+ private byte[] buffer;
+ private byte[] compressedBuffer;
+ private int originalLen;
+ private int o;
+ private boolean finished;
+
+ /**
+ * Create a new {@link InputStream}.
+ *
+ * @param in the {@link InputStream} to poll
+ * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
+ * use
+ * @param checksum the {@link Checksum} instance to use, must be
+ * equivalent to the instance which has been used to
+ * write the stream
+ */
+ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) {
+ super(in);
+ this.decompressor = decompressor;
+ this.checksum = checksum;
+ this.buffer = new byte[0];
+ this.compressedBuffer = new byte[HEADER_LENGTH];
+ o = originalLen = 0;
+ finished = false;
+ }
+
+ /**
+ * Create a new instance using {@link XXHash32} for checksuming.
+ * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum)
+ * @see StreamingXXHash32#asChecksum()
+ */
+ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
+ this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
+ }
+
+ /**
+ * Create a new instance which uses the fastest {@link LZ4FastDecompressor} available.
+ * @see LZ4Factory#fastestInstance()
+ * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor)
+ */
+ public LZ4BlockInputStream(InputStream in) {
+ this(in, LZ4Factory.fastestInstance().fastDecompressor());
+ }
+
+ @Override
+ public int available() throws IOException {
+ refill();
+ return originalLen - o;
+ }
+
+ @Override
+ public int read() throws IOException {
+ refill();
+ if (finished) {
+ return -1;
+ }
+ return buffer[o++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ SafeUtils.checkRange(b, off, len);
+ refill();
+ if (finished) {
+ return -1;
+ }
+ len = Math.min(len, originalLen - o);
+ System.arraycopy(buffer, o, b, off, len);
+ o += len;
+ return len;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ refill();
+ if (finished) {
+ return -1;
+ }
+ final int skipped = (int) Math.min(n, originalLen - o);
+ o += skipped;
+ return skipped;
+ }
+
+ private void refill() throws IOException {
+ if (finished || o < originalLen) {
+ return;
+ }
+ try {
+ readFully(compressedBuffer, HEADER_LENGTH);
+ } catch (EOFException e) {
+ finished = true;
+ return;
+ }
+ for (int i = 0; i < MAGIC_LENGTH; ++i) {
+ if (compressedBuffer[i] != MAGIC[i]) {
+ throw new IOException("Stream is corrupted");
+ }
+ }
+ final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF;
+ final int compressionMethod = token & 0xF0;
+ final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F);
+ if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4)
+ {
+ throw new IOException("Stream is corrupted");
+ }
+ final int compressedLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1);
+ originalLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5);
+ final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
+ assert HEADER_LENGTH == MAGIC_LENGTH + 13;
+ if (originalLen > 1 << compressionLevel
+ || originalLen < 0
+ || compressedLen < 0
+ || (originalLen == 0 && compressedLen != 0)
+ || (originalLen != 0 && compressedLen == 0)
+ || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
+ throw new IOException("Stream is corrupted");
+ }
+ if (originalLen == 0 && compressedLen == 0) {
+ if (check != 0) {
+ throw new IOException("Stream is corrupted");
+ }
+ refill();
+ return;
+ }
+ if (buffer.length < originalLen) {
+ buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
+ }
+ switch (compressionMethod) {
+ case COMPRESSION_METHOD_RAW:
+ readFully(buffer, originalLen);
+ break;
+ case COMPRESSION_METHOD_LZ4:
+ if (compressedBuffer.length < originalLen) {
+ compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
+ }
+ readFully(compressedBuffer, compressedLen);
+ try {
+ final int compressedLen2 =
+ decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
+ if (compressedLen != compressedLen2) {
+ throw new IOException("Stream is corrupted");
+ }
+ } catch (LZ4Exception e) {
+ throw new IOException("Stream is corrupted", e);
+ }
+ break;
+ default:
+ throw new AssertionError();
+ }
+ checksum.reset();
+ checksum.update(buffer, 0, originalLen);
+ if ((int) checksum.getValue() != check) {
+ throw new IOException("Stream is corrupted");
+ }
+ o = 0;
+ }
+
+ private void readFully(byte[] b, int len) throws IOException {
+ int read = 0;
+ while (read < len) {
+ final int r = in.read(b, read, len - read);
+ if (r < 0) {
+ throw new EOFException("Stream ended prematurely");
+ }
+ read += r;
+ }
+ assert len == read;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @SuppressWarnings("sync-override")
+ @Override
+ public void mark(int readlimit) {
+ // unsupported
+ }
+
+ @SuppressWarnings("sync-override")
+ @Override
+ public void reset() throws IOException {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(in=" + in
+ + ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 1553ab60bd..9e9c2b0165 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -46,7 +46,7 @@ class CompressionCodecSuite extends SparkFunSuite {
test("default compression codec") {
val codec = CompressionCodec.createCodec(conf)
- assert(codec.getClass === classOf[SnappyCompressionCodec])
+ assert(codec.getClass === classOf[LZ4CompressionCodec])
testCodec(codec)
}
@@ -62,12 +62,10 @@ class CompressionCodecSuite extends SparkFunSuite {
testCodec(codec)
}
- test("lz4 does not support concatenation of serialized streams") {
+ test("lz4 supports concatenation of serialized streams") {
val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
assert(codec.getClass === classOf[LZ4CompressionCodec])
- intercept[Exception] {
- testConcatenationOfSerializedStreams(codec)
- }
+ testConcatenationOfSerializedStreams(codec)
}
test("lzf compression codec") {