aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala20
1 files changed, 14 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 748f0a30ad..96b288b9cf 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -171,20 +171,26 @@ private[spark] class SerializerManager(
}
/** Serializes into a chunked byte buffer. */
- def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
- dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
+ def dataSerialize[T: ClassTag](
+ blockId: BlockId,
+ values: Iterator[T],
+ allowEncryption: Boolean = true): ChunkedByteBuffer = {
+ dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]],
+ allowEncryption = allowEncryption)
}
/** Serializes into a chunked byte buffer. */
def dataSerializeWithExplicitClassTag(
blockId: BlockId,
values: Iterator[_],
- classTag: ClassTag[_]): ChunkedByteBuffer = {
+ classTag: ClassTag[_],
+ allowEncryption: Boolean = true): ChunkedByteBuffer = {
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
val byteStream = new BufferedOutputStream(bbos)
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(classTag, autoPick).newInstance()
- ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
+ val encrypted = if (allowEncryption) wrapForEncryption(byteStream) else byteStream
+ ser.serializeStream(wrapForCompression(blockId, encrypted)).writeAll(values).close()
bbos.toChunkedByteBuffer
}
@@ -194,13 +200,15 @@ private[spark] class SerializerManager(
*/
def dataDeserializeStream[T](
blockId: BlockId,
- inputStream: InputStream)
+ inputStream: InputStream,
+ maybeEncrypted: Boolean = true)
(classTag: ClassTag[T]): Iterator[T] = {
val stream = new BufferedInputStream(inputStream)
val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val decrypted = if (maybeEncrypted) wrapForEncryption(inputStream) else inputStream
getSerializer(classTag, autoPick)
.newInstance()
- .deserializeStream(wrapStream(blockId, stream))
+ .deserializeStream(wrapForCompression(blockId, decrypted))
.asIterator.asInstanceOf[Iterator[T]]
}
}