diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-07 11:30:53 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-07 11:30:53 -0700 |
commit | efc5423210d1aadeaea78273a4a8f10425753079 (patch) | |
tree | 86e0ca94f41ef5a17b92be3d7be45b77f762e1f8 /core/src/main | |
parent | 039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3 (diff) | |
download | spark-efc5423210d1aadeaea78273a4a8f10425753079.tar.gz spark-efc5423210d1aadeaea78273a4a8f10425753079.tar.bz2 spark-efc5423210d1aadeaea78273a4a8f10425753079.zip |
Made compression configurable separately for shuffle, broadcast and RDDs
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 48 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/DiskStore.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/MemoryStore.scala | 8 |
3 files changed, 38 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 6c568cc2b0..91b7bebfb3 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -109,7 +109,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val maxBytesInFlight = System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 - val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean + val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean + val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean + // Whether to compress RDD partitions that are stored serialized + val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean + val host = System.getProperty("spark.hostname", Utils.localHostName()) initialize() @@ -252,7 +256,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() - return Some(dataDeserialize(bytes)) + return Some(dataDeserialize(blockId, bytes)) case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } @@ -355,7 +359,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port)) if (data != null) { logDebug("Data is not null: " + data) - return Some(dataDeserialize(data)) + return Some(dataDeserialize(blockId, data)) } logDebug("Data is null") } @@ -431,7 +435,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } val blockId = blockMessage.getId results.put(new FetchResult( - blockId, sizeMap(blockId), () => dataDeserialize(blockMessage.getData))) + blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData))) logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } } @@ -609,7 +613,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") } - bytesAfterPut = dataSerialize(valuesAfterPut) + bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, level) } @@ -787,24 +791,36 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + def shouldCompress(blockId: String): Boolean = { + if (blockId.startsWith("shuffle_")) { + compressShuffle + } else if (blockId.startsWith("broadcast_")) { + compressBroadcast + } else if (blockId.startsWith("rdd_")) { + compressRdds + } else { + false // Won't happen in a real cluster, but it can in tests + } + } + /** - * Wrap an output stream for compression if block compression is enabled + * Wrap an output stream for compression if block compression is enabled for its block type */ - def wrapForCompression(s: OutputStream): OutputStream = { - if (compress) new LZFOutputStream(s) else s + def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { + if (shouldCompress(blockId)) new LZFOutputStream(s) else s } /** - * Wrap an input stream for compression if block compression is enabled + * Wrap an input stream for compression if block compression is enabled for its block type */ - def wrapForCompression(s: InputStream): InputStream = { - if (compress) new LZFInputStream(s) else s + def wrapForCompression(blockId: String, s: InputStream): InputStream = { + if (shouldCompress(blockId)) new LZFInputStream(s) else s } - def dataSerialize(values: Iterator[Any]): ByteBuffer = { + def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) val ser = serializer.newInstance() - ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close() + ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() byteStream.trim() ByteBuffer.wrap(byteStream.array) } @@ -813,10 +829,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { + def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() - val ser = serializer.newInstance() - ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes, true))).asIterator + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator } def stop() { diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 3e6f09257a..fd92a3dc67 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -53,7 +53,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis val file = createFile(blockId) - val fileOut = blockManager.wrapForCompression( + val fileOut = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) objOut.writeAll(values) @@ -83,7 +83,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def getValues(blockId: String): Option[Iterator[Any]] = { - getBytes(blockId).map(blockManager.dataDeserialize(_)) + getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) } override def remove(blockId: String) { diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 24e1d75013..e9288fdf43 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -31,7 +31,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { if (level.deserialized) { bytes.rewind() - val values = blockManager.dataDeserialize(bytes) + val values = blockManager.dataDeserialize(blockId, bytes) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) @@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) tryToPut(blockId, elements, sizeEstimate, true) PutResult(sizeEstimate, Left(elements.iterator)) } else { - val bytes = blockManager.dataSerialize(values) + val bytes = blockManager.dataSerialize(blockId, values) tryToPut(blockId, bytes, bytes.limit, false) PutResult(bytes.limit(), Right(bytes)) } @@ -71,7 +71,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry == null) { None } else if (entry.deserialized) { - Some(blockManager.dataSerialize(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) + Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) } else { Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data } @@ -87,7 +87,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) } else { val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data - Some(blockManager.dataDeserialize(buffer)) + Some(blockManager.dataDeserialize(blockId, buffer)) } } |