diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2017-02-13 14:19:41 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2017-02-13 14:19:41 -0800 |
commit | 0169360ef58891ca10a8d64d1c8637c7b873cbdd (patch) | |
tree | 8a0e7b1652c7d32bda363ee7cbf4696a1daed608 /core/src | |
parent | 9af8f743b00001f9fdf8813481464c3837331ad9 (diff) | |
download | spark-0169360ef58891ca10a8d64d1c8637c7b873cbdd.tar.gz spark-0169360ef58891ca10a8d64d1c8637c7b873cbdd.tar.bz2 spark-0169360ef58891ca10a8d64d1c8637c7b873cbdd.zip |
[SPARK-19520][STREAMING] Do not encrypt data written to the WAL.
Spark's I/O encryption uses an ephemeral key for each driver instance.
So driver B cannot decrypt data written by driver A since it doesn't
have the correct key.
The write ahead log is used for recovery, thus needs to be readable by
a different driver. So it cannot be encrypted by Spark's I/O encryption
code.
The BlockManager APIs used by the WAL code to write the data automatically
encrypt data, so changes are needed so that callers can to opt out of
encryption.
Aside from that, the "putBytes" API in the BlockManager does not do
encryption, so a separate situation arised where the WAL would write
unencrypted data to the BM and, when those blocks were read, decryption
would fail. So the WAL code needs to ask the BM to encrypt that data
when encryption is enabled; this code is not optimal since it results
in a (temporary) second copy of the data block in memory, but should be
OK for now until a more performant solution is added. The non-encryption
case should not be affected.
Tested with new unit tests, and by running streaming apps that do
recovery using the WAL data with I/O encryption turned on.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #16862 from vanzin/SPARK-19520.
Diffstat (limited to 'core/src')
3 files changed, 48 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index cde768281f..2480e56b72 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -184,7 +184,7 @@ import org.apache.spark.util.Utils private[spark] class SecurityManager( sparkConf: SparkConf, - ioEncryptionKey: Option[Array[Byte]] = None) + val ioEncryptionKey: Option[Array[Byte]] = None) extends Logging with SecretKeyHolder { import SecurityManager._ diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 748f0a30ad..96b288b9cf 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -171,20 +171,26 @@ private[spark] class SerializerManager( } /** Serializes into a chunked byte buffer. */ - def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = { - dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]]) + def dataSerialize[T: ClassTag]( + blockId: BlockId, + values: Iterator[T], + allowEncryption: Boolean = true): ChunkedByteBuffer = { + dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]], + allowEncryption = allowEncryption) } /** Serializes into a chunked byte buffer. */ def dataSerializeWithExplicitClassTag( blockId: BlockId, values: Iterator[_], - classTag: ClassTag[_]): ChunkedByteBuffer = { + classTag: ClassTag[_], + allowEncryption: Boolean = true): ChunkedByteBuffer = { val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) val byteStream = new BufferedOutputStream(bbos) val autoPick = !blockId.isInstanceOf[StreamBlockId] val ser = getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() + val encrypted = if (allowEncryption) wrapForEncryption(byteStream) else byteStream + ser.serializeStream(wrapForCompression(blockId, encrypted)).writeAll(values).close() bbos.toChunkedByteBuffer } @@ -194,13 +200,15 @@ private[spark] class SerializerManager( */ def dataDeserializeStream[T]( blockId: BlockId, - inputStream: InputStream) + inputStream: InputStream, + maybeEncrypted: Boolean = true) (classTag: ClassTag[T]): Iterator[T] = { val stream = new BufferedInputStream(inputStream) val autoPick = !blockId.isInstanceOf[StreamBlockId] + val decrypted = if (maybeEncrypted) wrapForEncryption(inputStream) else inputStream getSerializer(classTag, autoPick) .newInstance() - .deserializeStream(wrapStream(blockId, stream)) + .deserializeStream(wrapForCompression(blockId, decrypted)) .asIterator.asInstanceOf[Iterator[T]] } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c40186756f..6946a98cdd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,6 +28,8 @@ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal +import com.google.common.io.ByteStreams + import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging @@ -38,6 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv +import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ @@ -752,15 +755,43 @@ private[spark] class BlockManager( /** * Put a new block of serialized bytes to the block manager. * + * @param encrypt If true, asks the block manager to encrypt the data block before storing, + * when I/O encryption is enabled. This is required for blocks that have been + * read from unencrypted sources, since all the BlockManager read APIs + * automatically do decryption. * @return true if the block was stored or false if an error occurred. */ def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, - tellMaster: Boolean = true): Boolean = { + tellMaster: Boolean = true, + encrypt: Boolean = false): Boolean = { require(bytes != null, "Bytes is null") - doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) + + val bytesToStore = + if (encrypt && securityManager.ioEncryptionKey.isDefined) { + try { + val data = bytes.toByteBuffer + val in = new ByteBufferInputStream(data, true) + val byteBufOut = new ByteBufferOutputStream(data.remaining()) + val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf, + securityManager.ioEncryptionKey.get) + try { + ByteStreams.copy(in, out) + } finally { + in.close() + out.close() + } + new ChunkedByteBuffer(byteBufOut.toByteBuffer) + } finally { + bytes.dispose() + } + } else { + bytes + } + + doPutBytes(blockId, bytesToStore, level, implicitly[ClassTag[T]], tellMaster) } /** |