diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-07 11:30:53 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-07 11:30:53 -0700 |
commit | efc5423210d1aadeaea78273a4a8f10425753079 (patch) | |
tree | 86e0ca94f41ef5a17b92be3d7be45b77f762e1f8 | |
parent | 039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3 (diff) | |
download | spark-efc5423210d1aadeaea78273a4a8f10425753079.tar.gz spark-efc5423210d1aadeaea78273a4a8f10425753079.tar.bz2 spark-efc5423210d1aadeaea78273a4a8f10425753079.zip |
Made compression configurable separately for shuffle, broadcast and RDDs
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 48 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/DiskStore.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/MemoryStore.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 96 | ||||
-rw-r--r-- | docs/configuration.md | 41 |
5 files changed, 145 insertions, 52 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 6c568cc2b0..91b7bebfb3 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -109,7 +109,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val maxBytesInFlight = System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 - val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean + val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean + val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean + // Whether to compress RDD partitions that are stored serialized + val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean + val host = System.getProperty("spark.hostname", Utils.localHostName()) initialize() @@ -252,7 +256,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() - return Some(dataDeserialize(bytes)) + return Some(dataDeserialize(blockId, bytes)) case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } @@ -355,7 +359,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port)) if (data != null) { logDebug("Data is not null: " + data) - return Some(dataDeserialize(data)) + return Some(dataDeserialize(blockId, data)) } logDebug("Data is null") } @@ -431,7 +435,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } val blockId = blockMessage.getId results.put(new FetchResult( - blockId, sizeMap(blockId), () => dataDeserialize(blockMessage.getData))) + blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData))) logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } } @@ -609,7 +613,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") } - bytesAfterPut = dataSerialize(valuesAfterPut) + bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, level) } @@ -787,24 +791,36 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + def shouldCompress(blockId: String): Boolean = { + if (blockId.startsWith("shuffle_")) { + compressShuffle + } else if (blockId.startsWith("broadcast_")) { + compressBroadcast + } else if (blockId.startsWith("rdd_")) { + compressRdds + } else { + false // Won't happen in a real cluster, but it can in tests + } + } + /** - * Wrap an output stream for compression if block compression is enabled + * Wrap an output stream for compression if block compression is enabled for its block type */ - def wrapForCompression(s: OutputStream): OutputStream = { - if (compress) new LZFOutputStream(s) else s + def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { + if (shouldCompress(blockId)) new LZFOutputStream(s) else s } /** - * Wrap an input stream for compression if block compression is enabled + * Wrap an input stream for compression if block compression is enabled for its block type */ - def wrapForCompression(s: InputStream): InputStream = { - if (compress) new LZFInputStream(s) else s + def wrapForCompression(blockId: String, s: InputStream): InputStream = { + if (shouldCompress(blockId)) new LZFInputStream(s) else s } - def dataSerialize(values: Iterator[Any]): ByteBuffer = { + def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) val ser = serializer.newInstance() - ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close() + ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() byteStream.trim() ByteBuffer.wrap(byteStream.array) } @@ -813,10 +829,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { + def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() - val ser = serializer.newInstance() - ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes, true))).asIterator + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator } def stop() { diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 3e6f09257a..fd92a3dc67 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -53,7 +53,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis val file = createFile(blockId) - val fileOut = blockManager.wrapForCompression( + val fileOut = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) objOut.writeAll(values) @@ -83,7 +83,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def getValues(blockId: String): Option[Iterator[Any]] = { - getBytes(blockId).map(blockManager.dataDeserialize(_)) + getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) } override def remove(blockId: String) { diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 24e1d75013..e9288fdf43 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -31,7 +31,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { if (level.deserialized) { bytes.rewind() - val values = blockManager.dataDeserialize(bytes) + val values = blockManager.dataDeserialize(blockId, bytes) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) @@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) tryToPut(blockId, elements, sizeEstimate, true) PutResult(sizeEstimate, Left(elements.iterator)) } else { - val bytes = blockManager.dataSerialize(values) + val bytes = blockManager.dataSerialize(blockId, values) tryToPut(blockId, bytes, bytes.limit, false) PutResult(bytes.limit(), Right(bytes)) } @@ -71,7 +71,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry == null) { None } else if (entry.deserialized) { - Some(blockManager.dataSerialize(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) + Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) } else { Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data } @@ -87,7 +87,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) } else { val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data - Some(blockManager.dataDeserialize(buffer)) + Some(blockManager.dataDeserialize(blockId, buffer)) } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 213a423fef..31b33eae09 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -13,10 +13,14 @@ import spark.SizeEstimator import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { + var store: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null - var oldArch: String = _ - var oldOops: String = _ + var oldArch: String = null + var oldOops: String = null + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + val serializer = new KryoSerializer before { actorSystem = ActorSystem("test") @@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { + if (store != null) { + store.stop() + } actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null @@ -49,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("manager-master interaction") { - val store = new BlockManager(master, new KryoSerializer, 2000) + store = new BlockManager(master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -79,7 +86,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -98,7 +105,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -117,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -136,7 +143,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -159,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -172,7 +179,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -187,7 +194,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -202,7 +209,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -217,7 +224,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -232,7 +239,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -257,7 +264,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -281,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -327,11 +334,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - val store = new BlockManager(master, new KryoSerializer, 500) + store = new BlockManager(master, serializer, 500) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") assert(store.getSingle("a2") != None, "a2 was not in store") } + + test("block compression") { + try { + System.setProperty("spark.shuffle.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.shuffle.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") + store.stop() + store = null + + // Check that any other block types are also kept uncompressed + store = new BlockManager(master, serializer, 2000) + store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") + store.stop() + store = null + } finally { + System.clearProperty("spark.shuffle.compress") + System.clearProperty("spark.broadcast.compress") + System.clearProperty("spark.rdd.compress") + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 0987f7f7b1..db90b5bc16 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -113,29 +113,34 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td>spark.blockManager.compress</td> - <td>false</td> + <td>spark.storage.memoryFraction</td> + <td>0.66</td> <td> - Set to "true" to have Spark compress map output files, RDDs that get cached on disk, - and RDDs that get cached in serialized form. Generally a good idea when dealing with - large datasets, but might add some CPU overhead. + Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" + generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase + it if you configure your own old generation size. + </td> +</tr> +<tr> + <td>spark.shuffle.compress</td> + <td>true</td> + <td> + Whether to compress map output files. Generally a good idea. </td> </tr> <tr> <td>spark.broadcast.compress</td> - <td>false</td> + <td>true</td> <td> - Set to "true" to have Spark compress broadcast variables before sending them. - Generally a good idea when broadcasting large values. + Whether to compress broadcast variables before sending them. Generally a good idea. </td> </tr> <tr> - <td>spark.storage.memoryFraction</td> - <td>0.66</td> + <td>spark.rdd.compress</td> + <td>false</td> <td> - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase - it if you configure your own old generation size. + Whether to compress serialized RDD partitions (e.g. for <code>StorageLevel.MEMORY_ONLY_SER</code>). + Can save substantial space at the cost of some extra CPU time. </td> </tr> <tr> @@ -181,10 +186,18 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td>spark.akka.threads</td> + <td>4</td> + <td> + Number of actor threads to use for communication. Can be useful to increase on large clusters + when the master has a lot of CPU cores. + </td> +</tr> +<tr> <td>spark.master.host</td> <td>(local hostname)</td> <td> - Hostname for the master to listen on (it will bind to this hostname's IP address). + Hostname or IP address for the master to listen on. </td> </tr> <tr> |