aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-30 16:04:18 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-30 16:04:18 -0700
commit368c58eac55931f62677cf8fc38168a1dc9dfcec (patch)
treeec9ed40b524a14f964525ec9313bc445e8028a83
parente87de037d608231610352512b8f66d7232398374 (diff)
parent94238aae57475030f6e88102a83c7809c5835494 (diff)
downloadspark-368c58eac55931f62677cf8fc38168a1dc9dfcec.tar.gz
spark-368c58eac55931f62677cf8fc38168a1dc9dfcec.tar.bz2
spark-368c58eac55931f62677cf8fc38168a1dc9dfcec.zip
Merge branch 'lazy_file_open' of github.com:lyogavin/spark into compression
Conflicts: project/SparkBuild.scala
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala20
-rw-r--r--core/src/main/scala/spark/storage/CompressionCodec.scala13
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala1
-rw-r--r--core/src/main/scala/spark/storage/LZFCompressionCodec.scala16
-rw-r--r--core/src/main/scala/spark/storage/SnappyCompressionCodec.scala18
-rw-r--r--project/SparkBuild.scala1
6 files changed, 66 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index e4ffa57ad2..4228c902f8 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -158,6 +158,8 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
+ var compressionCodec: CompressionCodec = null
+
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
@@ -919,8 +921,15 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
+ if (compressionCodec == null) {
+ compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec",
+ "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader)
+ .newInstance().asInstanceOf[CompressionCodec]
+ }
+
if (shouldCompress(blockId)) {
- (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
+ //(new LZFOutputStream(s)).setFinishBlockOnFlush(true)
+ compressionCodec.compressionOutputStream(s)
} else {
s
}
@@ -930,7 +939,14 @@ private[spark] class BlockManager(
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
- if (shouldCompress(blockId)) new LZFInputStream(s) else s
+ if (compressionCodec == null) {
+ compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec",
+ "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader)
+ .newInstance().asInstanceOf[CompressionCodec]
+ }
+
+ if (shouldCompress(blockId)) /*new LZFInputStream(s) */
+ compressionCodec.compressionInputStream(s) else s
}
def dataSerialize(
diff --git a/core/src/main/scala/spark/storage/CompressionCodec.scala b/core/src/main/scala/spark/storage/CompressionCodec.scala
new file mode 100644
index 0000000000..cd80de33f6
--- /dev/null
+++ b/core/src/main/scala/spark/storage/CompressionCodec.scala
@@ -0,0 +1,13 @@
+package spark.storage
+
+import java.io.{InputStream, OutputStream}
+
+
+/**
+ * CompressionCodec allows the customization of the compression codec
+ */
+trait CompressionCodec {
+ def compressionOutputStream(s: OutputStream): OutputStream
+
+ def compressionInputStream(s: InputStream): InputStream
+}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3495d653bd..3ebfe173b1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
- bs.close()
channel = null
bs = null
objOut = null
diff --git a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala
new file mode 100644
index 0000000000..3328b949ef
--- /dev/null
+++ b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala
@@ -0,0 +1,16 @@
+package spark.storage
+
+import java.io.{InputStream, OutputStream}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+/**
+ * LZF implementation of [[spark.storage.CompressionCodec]]
+ */
+class LZFCompressionCodec extends CompressionCodec {
+ def compressionOutputStream(s: OutputStream): OutputStream =
+ (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
+
+ def compressionInputStream(s: InputStream): InputStream =
+ new LZFInputStream(s)
+}
diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala
new file mode 100644
index 0000000000..62b00ef3f6
--- /dev/null
+++ b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala
@@ -0,0 +1,18 @@
+package spark.storage
+
+import java.io.{InputStream, OutputStream}
+
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+/**
+ * Snappy implementation of [[spark.storage.CompressionCodec]]
+ * block size can be configured by spark.snappy.block.size
+ */
+class SnappyCompressionCodec extends CompressionCodec {
+ def compressionOutputStream(s: OutputStream): OutputStream =
+ new SnappyOutputStream(s,
+ System.getProperty("spark.snappy.block.size", "32768").toInt)
+
+ def compressionInputStream(s: InputStream): InputStream =
+ new SnappyInputStream(s)
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9920e00a67..be3ef1f148 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -168,6 +168,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"commons-daemon" % "commons-daemon" % "1.0.10",
"com.ning" % "compress-lzf" % "0.8.4",
+ "org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),