diff options
Diffstat (limited to 'core')
4 files changed, 39 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index a98cbb6994..64037fb2d5 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -65,7 +65,7 @@ private object HttpBroadcast extends Logging { synchronized { if (!initialized) { bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - compress = System.getProperty("spark.compress", "false").toBoolean + compress = System.getProperty("spark.broadcast.compress", "true").toBoolean if (isMaster) { createServer() } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 15748b70d5..bae5c8c567 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -5,7 +5,7 @@ import akka.util.Duration import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput} import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} @@ -15,6 +15,7 @@ import scala.collection.JavaConversions._ import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils} import spark.network._ import spark.util.ByteBufferInputStream +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} class BlockManagerId(var ip: String, var port: Int) extends Externalizable { @@ -76,7 +77,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This will be removed after cacheTracker is removed from the code base. var cacheTracker: CacheTracker = null - val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties() + val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties + + val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean initLogging() @@ -605,10 +608,21 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + /** Wrap an output stream for compression if block compression is enabled */ + def wrapForCompression(s: OutputStream): OutputStream = { + if (compress) new LZFOutputStream(s) else s + } + + /** Wrap an input stream for compression if block compression is enabled */ + def wrapForCompression(s: InputStream): InputStream = { + if (compress) new LZFInputStream(s) else s + } + def dataSerialize(values: Iterator[Any]): ByteBuffer = { /*serializer.newInstance().serializeMany(values)*/ val byteStream = new FastByteArrayOutputStream(4096) - serializer.newInstance().serializeStream(byteStream).writeAll(values).close() + val ser = serializer.newInstance() + ser.serializeStream(wrapForCompression(byteStream)).writeAll(values).close() byteStream.trim() ByteBuffer.wrap(byteStream.array) } @@ -616,7 +630,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() val ser = serializer.newInstance() - return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator + return ser.deserializeStream(wrapForCompression(new ByteBufferInputStream(bytes))).asIterator } def stop() { @@ -630,11 +644,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m object BlockManager { - def getNumParallelFetchesFromSystemProperties(): Int = { + def getNumParallelFetchesFromSystemProperties: Int = { System.getProperty("spark.blockManager.parallelFetches", "4").toInt } - def getMaxMemoryFromSystemProperties(): Long = { + def getMaxMemoryFromSystemProperties: Long = { val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble (Runtime.getRuntime.maxMemory * memoryFraction).toLong } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index d505df66a7..7696aa9567 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -233,7 +233,8 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to write values for block " + blockId) val file = createFile(blockId) - val fileOut = new FastBufferedOutputStream(new FileOutputStream(file)) + val fileOut = blockManager.wrapForCompression( + new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) objOut.writeAll(values) objOut.close() diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 9d7e2591f1..90760b8a85 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -69,6 +69,22 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { assert(valuesFor2.toList.sorted === List(1)) } + test("groupByKey with compression") { + try { + System.setProperty("spark.blockManager.compress", "true") + sc = new SparkContext("local", "test") + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) + val groups = pairs.groupByKey(4).collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } finally { + System.setProperty("spark.blockManager.compress", "false") + } + } + test("reduceByKey") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) |