aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-15 01:46:57 -0700
committerReynold Xin <rxin@apache.org>2014-07-15 01:46:57 -0700
commitdd95abada78b4d0aec97dacda50fdfd74464b073 (patch)
tree823d5955b61420b64144dbf7fabee893492c1f13 /core/src/main/scala
parent7446f5ff93142d2dd5c79c63fa947f47a1d4db8b (diff)
downloadspark-dd95abada78b4d0aec97dacda50fdfd74464b073.tar.gz
spark-dd95abada78b4d0aec97dacda50fdfd74464b073.tar.bz2
spark-dd95abada78b4d0aec97dacda50fdfd74464b073.zip
[SPARK-2399] Add support for LZ4 compression.
Based on Greg Bowyer's patch from JIRA https://issues.apache.org/jira/browse/SPARK-2399 Author: Reynold Xin <rxin@apache.org> Closes #1416 from rxin/lz4 and squashes the following commits: 6c8fefe [Reynold Xin] Fixed typo. 8a14d38 [Reynold Xin] [SPARK-2399] Add support for LZ4 compression.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala22
1 files changed, 22 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 4b0fe1ab82..33402c927c 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.io
import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
@@ -61,6 +62,27 @@ private[spark] object CompressionCodec {
/**
* :: DeveloperApi ::
+ * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
+ * Block size can be configured by `spark.io.compression.lz4.block.size`.
+ *
+ * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
+ * of Spark. This is intended for use as an internal compression utility within a single Spark
+ * application.
+ */
+@DeveloperApi
+class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
+ new LZ4BlockOutputStream(s, blockSize)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
+}
+
+
+/**
+ * :: DeveloperApi ::
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions