diff options
author | Reynold Xin <reynoldx@gmail.com> | 2013-07-30 16:04:18 -0700 |
---|---|---|
committer | Reynold Xin <reynoldx@gmail.com> | 2013-07-30 16:04:18 -0700 |
commit | 368c58eac55931f62677cf8fc38168a1dc9dfcec (patch) | |
tree | ec9ed40b524a14f964525ec9313bc445e8028a83 | |
parent | e87de037d608231610352512b8f66d7232398374 (diff) | |
parent | 94238aae57475030f6e88102a83c7809c5835494 (diff) | |
download | spark-368c58eac55931f62677cf8fc38168a1dc9dfcec.tar.gz spark-368c58eac55931f62677cf8fc38168a1dc9dfcec.tar.bz2 spark-368c58eac55931f62677cf8fc38168a1dc9dfcec.zip |
Merge branch 'lazy_file_open' of github.com:lyogavin/spark into compression
Conflicts:
project/SparkBuild.scala
6 files changed, 66 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index e4ffa57ad2..4228c902f8 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -158,6 +158,8 @@ private[spark] class BlockManager( val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) initialize() + var compressionCodec: CompressionCodec = null + /** * Construct a BlockManager with a memory limit set based on system properties. */ @@ -919,8 +921,15 @@ private[spark] class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { + if (compressionCodec == null) { + compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", + "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) + .newInstance().asInstanceOf[CompressionCodec] + } + if (shouldCompress(blockId)) { - (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + //(new LZFOutputStream(s)).setFinishBlockOnFlush(true) + compressionCodec.compressionOutputStream(s) } else { s } @@ -930,7 +939,14 @@ private[spark] class BlockManager( * Wrap an input stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: InputStream): InputStream = { - if (shouldCompress(blockId)) new LZFInputStream(s) else s + if (compressionCodec == null) { + compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", + "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) + .newInstance().asInstanceOf[CompressionCodec] + } + + if (shouldCompress(blockId)) /*new LZFInputStream(s) */ + compressionCodec.compressionInputStream(s) else s } def dataSerialize( diff --git a/core/src/main/scala/spark/storage/CompressionCodec.scala b/core/src/main/scala/spark/storage/CompressionCodec.scala new file mode 100644 index 0000000000..cd80de33f6 --- /dev/null +++ b/core/src/main/scala/spark/storage/CompressionCodec.scala @@ -0,0 +1,13 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + + +/** + * CompressionCodec allows the customization of the compression codec + */ +trait CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream + + def compressionInputStream(s: InputStream): InputStream +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 3495d653bd..3ebfe173b1 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { objOut.close() - bs.close() channel = null bs = null objOut = null diff --git a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala new file mode 100644 index 0000000000..3328b949ef --- /dev/null +++ b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala @@ -0,0 +1,16 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} + +/** + * LZF implementation of [[spark.storage.CompressionCodec]] + */ +class LZFCompressionCodec extends CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream = + (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + + def compressionInputStream(s: InputStream): InputStream = + new LZFInputStream(s) +} diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala new file mode 100644 index 0000000000..62b00ef3f6 --- /dev/null +++ b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala @@ -0,0 +1,18 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + +/** + * Snappy implementation of [[spark.storage.CompressionCodec]] + * block size can be configured by spark.snappy.block.size + */ +class SnappyCompressionCodec extends CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream = + new SnappyOutputStream(s, + System.getProperty("spark.snappy.block.size", "32768").toInt) + + def compressionInputStream(s: InputStream): InputStream = + new SnappyInputStream(s) +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9920e00a67..be3ef1f148 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -168,6 +168,7 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "commons-daemon" % "commons-daemon" % "1.0.10", "com.ning" % "compress-lzf" % "0.8.4", + "org.xerial.snappy" % "snappy-java" % "1.0.5", "org.ow2.asm" % "asm" % "4.0", "com.google.protobuf" % "protobuf-java" % "2.4.1", "com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty), |