aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala26
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala3
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala16
-rw-r--r--docs/configuration.md17
5 files changed, 56 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index a98cbb6994..64037fb2d5 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -65,7 +65,7 @@ private object HttpBroadcast extends Logging {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- compress = System.getProperty("spark.compress", "false").toBoolean
+ compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
if (isMaster) {
createServer()
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 15748b70d5..bae5c8c567 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -5,7 +5,7 @@ import akka.util.Duration
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
@@ -15,6 +15,7 @@ import scala.collection.JavaConversions._
import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils}
import spark.network._
import spark.util.ByteBufferInputStream
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
@@ -76,7 +77,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null
- val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties()
+ val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
+
+ val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
initLogging()
@@ -605,10 +608,21 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ /** Wrap an output stream for compression if block compression is enabled */
+ def wrapForCompression(s: OutputStream): OutputStream = {
+ if (compress) new LZFOutputStream(s) else s
+ }
+
+ /** Wrap an input stream for compression if block compression is enabled */
+ def wrapForCompression(s: InputStream): InputStream = {
+ if (compress) new LZFInputStream(s) else s
+ }
+
def dataSerialize(values: Iterator[Any]): ByteBuffer = {
/*serializer.newInstance().serializeMany(values)*/
val byteStream = new FastByteArrayOutputStream(4096)
- serializer.newInstance().serializeStream(byteStream).writeAll(values).close()
+ val ser = serializer.newInstance()
+ ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close()
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
}
@@ -616,7 +630,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
val ser = serializer.newInstance()
- return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator
+ return ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes))).asIterator
}
def stop() {
@@ -630,11 +644,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
- def getNumParallelFetchesFromSystemProperties(): Int = {
+ def getNumParallelFetchesFromSystemProperties: Int = {
System.getProperty("spark.blockManager.parallelFetches", "4").toInt
}
- def getMaxMemoryFromSystemProperties(): Long = {
+ def getMaxMemoryFromSystemProperties: Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index d505df66a7..7696aa9567 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -233,7 +233,8 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to write values for block " + blockId)
val file = createFile(blockId)
- val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+ val fileOut = blockManager.wrapForCompression(
+ new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 9d7e2591f1..90760b8a85 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -69,6 +69,22 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
assert(valuesFor2.toList.sorted === List(1))
}
+ test("groupByKey with compression") {
+ try {
+ System.setProperty("spark.blockManager.compress", "true")
+ sc = new SparkContext("local", "test")
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
+ val groups = pairs.groupByKey(4).collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ } finally {
+ System.setProperty("spark.blockManager.compress", "false")
+ }
+ }
+
test("reduceByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
diff --git a/docs/configuration.md b/docs/configuration.md
index 4e47ca16e8..e4d5b21a12 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -112,6 +112,23 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.blockManager.compress</td>
+ <td>false</td>
+ <td>
+ Set to "true" to have Spark compress map output files, RDDs that get cached on disk,
+ and RDDs that get cached in serialized form. Generally a good idea when dealing with
+ large datasets, but might add some CPU overhead.
+ </td>
+</tr>
+<tr>
+ <td>spark.broadcast.compress</td>
+ <td>false</td>
+ <td>
+ Set to "true" to have Spark compress broadcast variables before sending them.
+ Generally a good idea when broadcasting large values.
+ </td>
+</tr>
+<tr>
<td>spark.storage.memoryFraction</td>
<td>0.66</td>
<td>