aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-30 17:11:54 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-30 17:11:54 -0700
commitad7e9d0d64277f616f90f2ca8bf8a5844641883a (patch)
treeb4169e8ce5b24f07e4c805a4959c242d83d5b23f
parent368c58eac55931f62677cf8fc38168a1dc9dfcec (diff)
downloadspark-ad7e9d0d64277f616f90f2ca8bf8a5844641883a.tar.gz
spark-ad7e9d0d64277f616f90f2ca8bf8a5844641883a.tar.bz2
spark-ad7e9d0d64277f616f90f2ca8bf8a5844641883a.zip
CompressionCodec cleanup. Moved it to spark.io package.
-rw-r--r--core/src/main/scala/spark/io/CompressionCodec.scala82
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala32
-rw-r--r--core/src/main/scala/spark/storage/CompressionCodec.scala13
-rw-r--r--core/src/main/scala/spark/storage/LZFCompressionCodec.scala16
-rw-r--r--core/src/main/scala/spark/storage/SnappyCompressionCodec.scala18
-rw-r--r--core/src/test/scala/spark/io/CompressionCodecSuite.scala0
6 files changed, 91 insertions, 70 deletions
diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala
new file mode 100644
index 0000000000..2ba104a737
--- /dev/null
+++ b/core/src/main/scala/spark/io/CompressionCodec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{InputStream, OutputStream}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+
+/**
+ * CompressionCodec allows the customization of choosing different compression implementations
+ * to be used in block storage.
+ */
+trait CompressionCodec {
+
+ def compressionOutputStream(s: OutputStream): OutputStream
+
+ def compressionInputStream(s: InputStream): InputStream
+}
+
+
+private[spark] object CompressionCodec {
+
+ def createCodec(): CompressionCodec = {
+ // Set the default codec to Snappy since the LZF implementation initializes a pretty large
+ // buffer for every stream, which results in a lot of memory overhead when the number of
+ // shuffle reduce buckets are large.
+ createCodec(classOf[SnappyCompressionCodec].getName)
+ }
+
+ def createCodec(codecName: String): CompressionCodec = {
+ Class.forName(
+ System.getProperty("spark.io.compression.codec", codecName),
+ true,
+ Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
+ }
+}
+
+
+/**
+ * LZF implementation of [[spark.io.CompressionCodec]].
+ */
+class LZFCompressionCodec extends CompressionCodec {
+
+ override def compressionOutputStream(s: OutputStream): OutputStream = {
+ new LZFOutputStream(s).setFinishBlockOnFlush(true)
+ }
+
+ override def compressionInputStream(s: InputStream): InputStream = new LZFInputStream(s)
+}
+
+
+/**
+ * Snappy implementation of [[spark.io.CompressionCodec]].
+ * Block size can be configured by spark.io.compression.snappy.block.size.
+ */
+class SnappyCompressionCodec extends CompressionCodec {
+
+ override def compressionOutputStream(s: OutputStream): OutputStream = {
+ val blockSize = System.getProperty("spark.io.snappy.block.size", "32768").toInt
+ new SnappyOutputStream(s, blockSize)
+ }
+
+ override def compressionInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
+}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 4228c902f8..9ed4c01218 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{Logging, SparkEnv, SparkException, Utils}
+import spark.io.CompressionCodec
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -158,7 +157,12 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
- var compressionCodec: CompressionCodec = null
+ // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
+ // the initialization of the compression codec until it is first used. The reason is that a Spark
+ // program could be using a user-defined codec in a third party jar, which is loaded in
+ // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
+ // loaded yet.
+ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
/**
* Construct a BlockManager with a memory limit set based on system properties.
@@ -921,32 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
- if (compressionCodec == null) {
- compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec",
- "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader)
- .newInstance().asInstanceOf[CompressionCodec]
- }
-
- if (shouldCompress(blockId)) {
- //(new LZFOutputStream(s)).setFinishBlockOnFlush(true)
- compressionCodec.compressionOutputStream(s)
- } else {
- s
- }
+ if (shouldCompress(blockId)) compressionCodec.compressionOutputStream(s) else s
}
/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
- if (compressionCodec == null) {
- compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec",
- "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader)
- .newInstance().asInstanceOf[CompressionCodec]
- }
-
- if (shouldCompress(blockId)) /*new LZFInputStream(s) */
- compressionCodec.compressionInputStream(s) else s
+ if (shouldCompress(blockId)) compressionCodec.compressionInputStream(s) else s
}
def dataSerialize(
diff --git a/core/src/main/scala/spark/storage/CompressionCodec.scala b/core/src/main/scala/spark/storage/CompressionCodec.scala
deleted file mode 100644
index cd80de33f6..0000000000
--- a/core/src/main/scala/spark/storage/CompressionCodec.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package spark.storage
-
-import java.io.{InputStream, OutputStream}
-
-
-/**
- * CompressionCodec allows the customization of the compression codec
- */
-trait CompressionCodec {
- def compressionOutputStream(s: OutputStream): OutputStream
-
- def compressionInputStream(s: InputStream): InputStream
-}
diff --git a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala
deleted file mode 100644
index 3328b949ef..0000000000
--- a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package spark.storage
-
-import java.io.{InputStream, OutputStream}
-
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
-/**
- * LZF implementation of [[spark.storage.CompressionCodec]]
- */
-class LZFCompressionCodec extends CompressionCodec {
- def compressionOutputStream(s: OutputStream): OutputStream =
- (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
-
- def compressionInputStream(s: InputStream): InputStream =
- new LZFInputStream(s)
-}
diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala
deleted file mode 100644
index 62b00ef3f6..0000000000
--- a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package spark.storage
-
-import java.io.{InputStream, OutputStream}
-
-import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
-
-/**
- * Snappy implementation of [[spark.storage.CompressionCodec]]
- * block size can be configured by spark.snappy.block.size
- */
-class SnappyCompressionCodec extends CompressionCodec {
- def compressionOutputStream(s: OutputStream): OutputStream =
- new SnappyOutputStream(s,
- System.getProperty("spark.snappy.block.size", "32768").toInt)
-
- def compressionInputStream(s: InputStream): InputStream =
- new SnappyInputStream(s)
-}
diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/core/src/test/scala/spark/io/CompressionCodecSuite.scala