aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala48
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala4
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala8
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala96
-rw-r--r--docs/configuration.md41
5 files changed, 145 insertions, 52 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 6c568cc2b0..91b7bebfb3 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -109,7 +109,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
- val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
+ val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
+ // Whether to compress RDD partitions that are stored serialized
+ val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
initialize()
@@ -252,7 +256,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
- return Some(dataDeserialize(bytes))
+ return Some(dataDeserialize(blockId, bytes))
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
@@ -355,7 +359,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port))
if (data != null) {
logDebug("Data is not null: " + data)
- return Some(dataDeserialize(data))
+ return Some(dataDeserialize(blockId, data))
}
logDebug("Data is null")
}
@@ -431,7 +435,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
val blockId = blockMessage.getId
results.put(new FetchResult(
- blockId, sizeMap(blockId), () => dataDeserialize(blockMessage.getData)))
+ blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData)))
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
}
@@ -609,7 +613,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
- bytesAfterPut = dataSerialize(valuesAfterPut)
+ bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
}
@@ -787,24 +791,36 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def shouldCompress(blockId: String): Boolean = {
+ if (blockId.startsWith("shuffle_")) {
+ compressShuffle
+ } else if (blockId.startsWith("broadcast_")) {
+ compressBroadcast
+ } else if (blockId.startsWith("rdd_")) {
+ compressRdds
+ } else {
+ false // Won't happen in a real cluster, but it can in tests
+ }
+ }
+
/**
- * Wrap an output stream for compression if block compression is enabled
+ * Wrap an output stream for compression if block compression is enabled for its block type
*/
- def wrapForCompression(s: OutputStream): OutputStream = {
- if (compress) new LZFOutputStream(s) else s
+ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
+ if (shouldCompress(blockId)) new LZFOutputStream(s) else s
}
/**
- * Wrap an input stream for compression if block compression is enabled
+ * Wrap an input stream for compression if block compression is enabled for its block type
*/
- def wrapForCompression(s: InputStream): InputStream = {
- if (compress) new LZFInputStream(s) else s
+ def wrapForCompression(blockId: String, s: InputStream): InputStream = {
+ if (shouldCompress(blockId)) new LZFInputStream(s) else s
}
- def dataSerialize(values: Iterator[Any]): ByteBuffer = {
+ def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = {
val byteStream = new FastByteArrayOutputStream(4096)
val ser = serializer.newInstance()
- ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close()
+ ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
}
@@ -813,10 +829,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
+ def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
- val ser = serializer.newInstance()
- ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes, true))).asIterator
+ val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+ serializer.newInstance().deserializeStream(stream).asIterator
}
def stop() {
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3e6f09257a..fd92a3dc67 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -53,7 +53,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
- val fileOut = blockManager.wrapForCompression(
+ val fileOut = blockManager.wrapForCompression(blockId,
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
@@ -83,7 +83,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- getBytes(blockId).map(blockManager.dataDeserialize(_))
+ getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
}
override def remove(blockId: String) {
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 24e1d75013..e9288fdf43 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -31,7 +31,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
bytes.rewind()
- val values = blockManager.dataDeserialize(bytes)
+ val values = blockManager.dataDeserialize(blockId, bytes)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
@@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
tryToPut(blockId, elements, sizeEstimate, true)
PutResult(sizeEstimate, Left(elements.iterator))
} else {
- val bytes = blockManager.dataSerialize(values)
+ val bytes = blockManager.dataSerialize(blockId, values)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes))
}
@@ -71,7 +71,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry == null) {
None
} else if (entry.deserialized) {
- Some(blockManager.dataSerialize(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+ Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
} else {
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
@@ -87,7 +87,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
- Some(blockManager.dataDeserialize(buffer))
+ Some(blockManager.dataDeserialize(blockId, buffer))
}
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 213a423fef..31b33eae09 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -13,10 +13,14 @@ import spark.SizeEstimator
import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ var store: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
- var oldArch: String = _
- var oldOops: String = _
+ var oldArch: String = null
+ var oldOops: String = null
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
@@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
after {
+ if (store != null) {
+ store.stop()
+ }
actorSystem.shutdown()
actorSystem.awaitTermination()
actorSystem = null
@@ -49,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("manager-master interaction") {
- val store = new BlockManager(master, new KryoSerializer, 2000)
+ store = new BlockManager(master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -79,7 +86,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -98,7 +105,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -117,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -136,7 +143,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -159,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -172,7 +179,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -187,7 +194,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -202,7 +209,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -217,7 +224,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -232,7 +239,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -257,7 +264,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -281,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -327,11 +334,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- val store = new BlockManager(master, new KryoSerializer, 500)
+ store = new BlockManager(master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingle("a2") != None, "a2 was not in store")
}
+
+ test("block compression") {
+ try {
+ System.setProperty("spark.shuffle.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.shuffle.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
+ store.stop()
+ store = null
+
+ // Check that any other block types are also kept uncompressed
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+ store.stop()
+ store = null
+ } finally {
+ System.clearProperty("spark.shuffle.compress")
+ System.clearProperty("spark.broadcast.compress")
+ System.clearProperty("spark.rdd.compress")
+ }
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 0987f7f7b1..db90b5bc16 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -113,29 +113,34 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td>spark.blockManager.compress</td>
- <td>false</td>
+ <td>spark.storage.memoryFraction</td>
+ <td>0.66</td>
<td>
- Set to "true" to have Spark compress map output files, RDDs that get cached on disk,
- and RDDs that get cached in serialized form. Generally a good idea when dealing with
- large datasets, but might add some CPU overhead.
+ Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
+ generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
+ it if you configure your own old generation size.
+ </td>
+</tr>
+<tr>
+ <td>spark.shuffle.compress</td>
+ <td>true</td>
+ <td>
+ Whether to compress map output files. Generally a good idea.
</td>
</tr>
<tr>
<td>spark.broadcast.compress</td>
- <td>false</td>
+ <td>true</td>
<td>
- Set to "true" to have Spark compress broadcast variables before sending them.
- Generally a good idea when broadcasting large values.
+ Whether to compress broadcast variables before sending them. Generally a good idea.
</td>
</tr>
<tr>
- <td>spark.storage.memoryFraction</td>
- <td>0.66</td>
+ <td>spark.rdd.compress</td>
+ <td>false</td>
<td>
- Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
- generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
- it if you configure your own old generation size.
+ Whether to compress serialized RDD partitions (e.g. for <code>StorageLevel.MEMORY_ONLY_SER</code>).
+ Can save substantial space at the cost of some extra CPU time.
</td>
</tr>
<tr>
@@ -181,10 +186,18 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.akka.threads</td>
+ <td>4</td>
+ <td>
+ Number of actor threads to use for communication. Can be useful to increase on large clusters
+ when the master has a lot of CPU cores.
+ </td>
+</tr>
+<tr>
<td>spark.master.host</td>
<td>(local hostname)</td>
<td>
- Hostname for the master to listen on (it will bind to this hostname's IP address).
+ Hostname or IP address for the master to listen on.
</td>
</tr>
<tr>