aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-27 18:45:44 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-27 18:45:44 -0700
commit009b0e37e7c284c531cc3c44d0e5b5b1476f1666 (patch)
tree3983dc05e6daea8276e2b954c3b5992f142dc07c /core/src/main
parent7bcb08cef5e6438ce8c8efa3da3a8f94f2a1fbf9 (diff)
downloadspark-009b0e37e7c284c531cc3c44d0e5b5b1476f1666.tar.gz
spark-009b0e37e7c284c531cc3c44d0e5b5b1476f1666.tar.bz2
spark-009b0e37e7c284c531cc3c44d0e5b5b1476f1666.zip
Added an option to compress blocks in the block store
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala26
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala3
3 files changed, 23 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index a98cbb6994..64037fb2d5 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -65,7 +65,7 @@ private object HttpBroadcast extends Logging {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- compress = System.getProperty("spark.compress", "false").toBoolean
+ compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
if (isMaster) {
createServer()
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 15748b70d5..bae5c8c567 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -5,7 +5,7 @@ import akka.util.Duration
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
@@ -15,6 +15,7 @@ import scala.collection.JavaConversions._
import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils}
import spark.network._
import spark.util.ByteBufferInputStream
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
@@ -76,7 +77,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null
- val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties()
+ val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
+
+ val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
initLogging()
@@ -605,10 +608,21 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ /** Wrap an output stream for compression if block compression is enabled */
+ def wrapForCompression(s: OutputStream): OutputStream = {
+ if (compress) new LZFOutputStream(s) else s
+ }
+
+ /** Wrap an input stream for compression if block compression is enabled */
+ def wrapForCompression(s: InputStream): InputStream = {
+ if (compress) new LZFInputStream(s) else s
+ }
+
def dataSerialize(values: Iterator[Any]): ByteBuffer = {
/*serializer.newInstance().serializeMany(values)*/
val byteStream = new FastByteArrayOutputStream(4096)
- serializer.newInstance().serializeStream(byteStream).writeAll(values).close()
+ val ser = serializer.newInstance()
+ ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close()
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
}
@@ -616,7 +630,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
val ser = serializer.newInstance()
- return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator
+ return ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes))).asIterator
}
def stop() {
@@ -630,11 +644,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
- def getNumParallelFetchesFromSystemProperties(): Int = {
+ def getNumParallelFetchesFromSystemProperties: Int = {
System.getProperty("spark.blockManager.parallelFetches", "4").toInt
}
- def getMaxMemoryFromSystemProperties(): Long = {
+ def getMaxMemoryFromSystemProperties: Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index d505df66a7..7696aa9567 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -233,7 +233,8 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to write values for block " + blockId)
val file = createFile(blockId)
- val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+ val fileOut = blockManager.wrapForCompression(
+ new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()