aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-20 19:00:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-20 19:00:48 -0800
commitc324ac10ee208c53dabd54e5c0e1885aea456811 (patch)
tree485f5f8facbae362b723f5f00270e4fad66cdd8d
parent1b299142a8d5feb70677dce993127de466266ff6 (diff)
downloadspark-c324ac10ee208c53dabd54e5c0e1885aea456811.tar.gz
spark-c324ac10ee208c53dabd54e5c0e1885aea456811.tar.bz2
spark-c324ac10ee208c53dabd54e5c0e1885aea456811.zip
Force use of LZF when spilling data
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala42
-rw-r--r--docs/configuration.md4
2 files changed, 39 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8df8b4f83e..792f29de60 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
- val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
+ /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
+ * closes and re-opens serialization and compression streams within each file. This makes some
+ * assumptions about the way that serialization and compression streams work, specifically:
+ *
+ * 1) The serializer input streams do not pre-fetch data from the underlying stream.
+ *
+ * 2) Several compression streams can be opened, written to, and flushed on the write path
+ * while only one compression input stream is created on the read path
+ *
+ * In practice (1) is only true for Java, so we add a special fix below to make it work for
+ * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
+ *
+ * To avoid making these assumptions we should create an intermediate stream that batches
+ * objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
+ * This is a bit tricky because, within each segment, you'd need to track the total number
+ * of bytes written and then re-wind and write it at the beginning of the segment. This will
+ * most likely require using the file channel API.
+ */
+
+ val codec = new LZFCompressionCodec(sparkConf)
+
+ def wrapForCompression(outputStream: OutputStream) = {
+ blockManager.shouldCompress(blockId) match {
+ case true =>
+ codec.compressedOutputStream(outputStream)
+ case false =>
+ outputStream
+ }
+ }
+
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
- compressStream, syncWrites)
+ wrapForCompression, syncWrites)
var writer = getNewWriter
var objectsWritten = 0
diff --git a/docs/configuration.md b/docs/configuration.md
index 4c2e9cc479..be548e372d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td>
<td>true</td>
<td>
- Whether to compress data spilled during shuffles.
+ Whether to compress data spilled during shuffles. If enabled, spill compression
+ always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
+ regardless of the value of `spark.io.compression.codec`.
</td>
</tr>
<tr>