diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c40186756f..6946a98cdd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,6 +28,8 @@ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal +import com.google.common.io.ByteStreams + import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging @@ -38,6 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv +import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ @@ -752,15 +755,43 @@ private[spark] class BlockManager( /** * Put a new block of serialized bytes to the block manager. * + * @param encrypt If true, asks the block manager to encrypt the data block before storing, + * when I/O encryption is enabled. This is required for blocks that have been + * read from unencrypted sources, since all the BlockManager read APIs + * automatically do decryption. * @return true if the block was stored or false if an error occurred. */ def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, - tellMaster: Boolean = true): Boolean = { + tellMaster: Boolean = true, + encrypt: Boolean = false): Boolean = { require(bytes != null, "Bytes is null") - doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) + + val bytesToStore = + if (encrypt && securityManager.ioEncryptionKey.isDefined) { + try { + val data = bytes.toByteBuffer + val in = new ByteBufferInputStream(data, true) + val byteBufOut = new ByteBufferOutputStream(data.remaining()) + val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf, + securityManager.ioEncryptionKey.get) + try { + ByteStreams.copy(in, out) + } finally { + in.close() + out.close() + } + new ChunkedByteBuffer(byteBufOut.toByteBuffer) + } finally { + bytes.dispose() + } + } else { + bytes + } + + doPutBytes(blockId, bytesToStore, level, implicitly[ClassTag[T]], tellMaster) } /** |