aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-11-30 14:10:32 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-11-30 14:10:32 -0800
commit93e9d880bf8a144112d74a6897af4e36fcfa5807 (patch)
treec7e1f51b4d1a5b344857d874e86649c21ad5357a /core/src/main/scala/org
parentf135b70fd590438bebb2a54012a6f73074219758 (diff)
downloadspark-93e9d880bf8a144112d74a6897af4e36fcfa5807.tar.gz
spark-93e9d880bf8a144112d74a6897af4e36fcfa5807.tar.bz2
spark-93e9d880bf8a144112d74a6897af4e36fcfa5807.zip
[SPARK-18546][CORE] Fix merging shuffle spills when using encryption.
The problem exists because it's not possible to just concatenate encrypted partition data from different spill files; currently each partition would have its own initial vector to set up encryption, and the final merged file should contain a single initial vector for each merged partiton, otherwise iterating over each record becomes really hard. To fix that, UnsafeShuffleWriter now decrypts the partitions when merging, so that the merged file contains a single initial vector at the start of the partition data. Because it's not possible to do that using the fast transferTo path, when encryption is enabled UnsafeShuffleWriter will revert back to using file streams when merging. It may be possible to use a hybrid approach when using encryption, using an intermediate direct buffer when reading from files and encrypting the data, but that's better left for a separate patch. As part of the change I made DiskBlockObjectWriter take a SerializerManager instead of a "wrap stream" closure, since that makes it easier to test the code without having to mock SerializerManager functionality. Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write side and ExternalAppendOnlyMapSuite for integration), and by running some apps that failed without the fix. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15982 from vanzin/SPARK-18546.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala6
3 files changed, 9 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 7371f88657..686305e933 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -75,6 +75,8 @@ private[spark] class SerializerManager(
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
+ def encryptionEnabled: Boolean = encryptionKey.isDefined
+
def canUseKryo(ct: ClassTag[_]): Boolean = {
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}
@@ -129,7 +131,7 @@ private[spark] class SerializerManager(
/**
* Wrap an input stream for encryption if shuffle encryption is enabled
*/
- private[this] def wrapForEncryption(s: InputStream): InputStream = {
+ def wrapForEncryption(s: InputStream): InputStream = {
encryptionKey
.map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }
.getOrElse(s)
@@ -138,7 +140,7 @@ private[spark] class SerializerManager(
/**
* Wrap an output stream for encryption if shuffle encryption is enabled
*/
- private[this] def wrapForEncryption(s: OutputStream): OutputStream = {
+ def wrapForEncryption(s: OutputStream): OutputStream = {
encryptionKey
.map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }
.getOrElse(s)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 982b83324e..04521c9159 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -62,7 +62,7 @@ private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
- serializerManager: SerializerManager,
+ val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
@@ -745,9 +745,8 @@ private[spark] class BlockManager(
serializerInstance: SerializerInstance,
bufferSize: Int,
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
- val wrapStream: OutputStream => OutputStream = serializerManager.wrapStream(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
- new DiskBlockObjectWriter(file, serializerInstance, bufferSize, wrapStream,
+ new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize,
syncWrites, writeMetrics, blockId)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index a499827ae1..3cb12fca7d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -22,7 +22,7 @@ import java.nio.channels.FileChannel
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.internal.Logging
-import org.apache.spark.serializer.{SerializationStream, SerializerInstance}
+import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager}
import org.apache.spark.util.Utils
/**
@@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class DiskBlockObjectWriter(
val file: File,
+ serializerManager: SerializerManager,
serializerInstance: SerializerInstance,
bufferSize: Int,
- wrapStream: OutputStream => OutputStream,
syncWrites: Boolean,
// These write metrics concurrently shared with other active DiskBlockObjectWriters who
// are themselves performing writes. All updates must be relative.
@@ -116,7 +116,7 @@ private[spark] class DiskBlockObjectWriter(
initialized = true
}
- bs = wrapStream(mcs)
+ bs = serializerManager.wrapStream(blockId, mcs)
objOut = serializerInstance.serializeStream(bs)
streamOpen = true
this