aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-07 11:30:53 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-07 11:30:53 -0700
commitefc5423210d1aadeaea78273a4a8f10425753079 (patch)
tree86e0ca94f41ef5a17b92be3d7be45b77f762e1f8 /core/src/main
parent039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3 (diff)
downloadspark-efc5423210d1aadeaea78273a4a8f10425753079.tar.gz
spark-efc5423210d1aadeaea78273a4a8f10425753079.tar.bz2
spark-efc5423210d1aadeaea78273a4a8f10425753079.zip
Made compression configurable separately for shuffle, broadcast and RDDs
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala48
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala4
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala8
3 files changed, 38 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 6c568cc2b0..91b7bebfb3 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -109,7 +109,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
- val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
+ val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
+ // Whether to compress RDD partitions that are stored serialized
+ val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
initialize()
@@ -252,7 +256,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
- return Some(dataDeserialize(bytes))
+ return Some(dataDeserialize(blockId, bytes))
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
@@ -355,7 +359,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port))
if (data != null) {
logDebug("Data is not null: " + data)
- return Some(dataDeserialize(data))
+ return Some(dataDeserialize(blockId, data))
}
logDebug("Data is null")
}
@@ -431,7 +435,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
val blockId = blockMessage.getId
results.put(new FetchResult(
- blockId, sizeMap(blockId), () => dataDeserialize(blockMessage.getData)))
+ blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData)))
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
}
@@ -609,7 +613,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
- bytesAfterPut = dataSerialize(valuesAfterPut)
+ bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
}
@@ -787,24 +791,36 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def shouldCompress(blockId: String): Boolean = {
+ if (blockId.startsWith("shuffle_")) {
+ compressShuffle
+ } else if (blockId.startsWith("broadcast_")) {
+ compressBroadcast
+ } else if (blockId.startsWith("rdd_")) {
+ compressRdds
+ } else {
+ false // Won't happen in a real cluster, but it can in tests
+ }
+ }
+
/**
- * Wrap an output stream for compression if block compression is enabled
+ * Wrap an output stream for compression if block compression is enabled for its block type
*/
- def wrapForCompression(s: OutputStream): OutputStream = {
- if (compress) new LZFOutputStream(s) else s
+ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
+ if (shouldCompress(blockId)) new LZFOutputStream(s) else s
}
/**
- * Wrap an input stream for compression if block compression is enabled
+ * Wrap an input stream for compression if block compression is enabled for its block type
*/
- def wrapForCompression(s: InputStream): InputStream = {
- if (compress) new LZFInputStream(s) else s
+ def wrapForCompression(blockId: String, s: InputStream): InputStream = {
+ if (shouldCompress(blockId)) new LZFInputStream(s) else s
}
- def dataSerialize(values: Iterator[Any]): ByteBuffer = {
+ def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = {
val byteStream = new FastByteArrayOutputStream(4096)
val ser = serializer.newInstance()
- ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close()
+ ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
}
@@ -813,10 +829,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
+ def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
- val ser = serializer.newInstance()
- ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes, true))).asIterator
+ val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+ serializer.newInstance().deserializeStream(stream).asIterator
}
def stop() {
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3e6f09257a..fd92a3dc67 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -53,7 +53,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
- val fileOut = blockManager.wrapForCompression(
+ val fileOut = blockManager.wrapForCompression(blockId,
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
@@ -83,7 +83,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- getBytes(blockId).map(blockManager.dataDeserialize(_))
+ getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
}
override def remove(blockId: String) {
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 24e1d75013..e9288fdf43 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -31,7 +31,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
bytes.rewind()
- val values = blockManager.dataDeserialize(bytes)
+ val values = blockManager.dataDeserialize(blockId, bytes)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
@@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
tryToPut(blockId, elements, sizeEstimate, true)
PutResult(sizeEstimate, Left(elements.iterator))
} else {
- val bytes = blockManager.dataSerialize(values)
+ val bytes = blockManager.dataSerialize(blockId, values)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes))
}
@@ -71,7 +71,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry == null) {
None
} else if (entry.deserialized) {
- Some(blockManager.dataSerialize(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+ Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
} else {
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
@@ -87,7 +87,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
- Some(blockManager.dataDeserialize(buffer))
+ Some(blockManager.dataDeserialize(blockId, buffer))
}
}