diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 748f0a30ad..96b288b9cf 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -171,20 +171,26 @@ private[spark] class SerializerManager( } /** Serializes into a chunked byte buffer. */ - def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = { - dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]]) + def dataSerialize[T: ClassTag]( + blockId: BlockId, + values: Iterator[T], + allowEncryption: Boolean = true): ChunkedByteBuffer = { + dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]], + allowEncryption = allowEncryption) } /** Serializes into a chunked byte buffer. */ def dataSerializeWithExplicitClassTag( blockId: BlockId, values: Iterator[_], - classTag: ClassTag[_]): ChunkedByteBuffer = { + classTag: ClassTag[_], + allowEncryption: Boolean = true): ChunkedByteBuffer = { val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) val byteStream = new BufferedOutputStream(bbos) val autoPick = !blockId.isInstanceOf[StreamBlockId] val ser = getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() + val encrypted = if (allowEncryption) wrapForEncryption(byteStream) else byteStream + ser.serializeStream(wrapForCompression(blockId, encrypted)).writeAll(values).close() bbos.toChunkedByteBuffer } @@ -194,13 +200,15 @@ private[spark] class SerializerManager( */ def dataDeserializeStream[T]( blockId: BlockId, - inputStream: InputStream) + inputStream: InputStream, + maybeEncrypted: Boolean = true) (classTag: ClassTag[T]): Iterator[T] = { val stream = new BufferedInputStream(inputStream) val autoPick = !blockId.isInstanceOf[StreamBlockId] + val decrypted = if (maybeEncrypted) wrapForEncryption(inputStream) else inputStream getSerializer(classTag, autoPick) .newInstance() - .deserializeStream(wrapStream(blockId, stream)) + .deserializeStream(wrapForCompression(blockId, decrypted)) .asIterator.asInstanceOf[Iterator[T]] } } |