aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala35
1 files changed, 33 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c40186756f..6946a98cdd 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -28,6 +28,8 @@ import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.NonFatal
+import com.google.common.io.ByteStreams
+
import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.Logging
@@ -38,6 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
@@ -752,15 +755,43 @@ private[spark] class BlockManager(
/**
* Put a new block of serialized bytes to the block manager.
*
+ * @param encrypt If true, asks the block manager to encrypt the data block before storing,
+ * when I/O encryption is enabled. This is required for blocks that have been
+ * read from unencrypted sources, since all the BlockManager read APIs
+ * automatically do decryption.
* @return true if the block was stored or false if an error occurred.
*/
def putBytes[T: ClassTag](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
- tellMaster: Boolean = true): Boolean = {
+ tellMaster: Boolean = true,
+ encrypt: Boolean = false): Boolean = {
require(bytes != null, "Bytes is null")
- doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
+
+ val bytesToStore =
+ if (encrypt && securityManager.ioEncryptionKey.isDefined) {
+ try {
+ val data = bytes.toByteBuffer
+ val in = new ByteBufferInputStream(data, true)
+ val byteBufOut = new ByteBufferOutputStream(data.remaining())
+ val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
+ securityManager.ioEncryptionKey.get)
+ try {
+ ByteStreams.copy(in, out)
+ } finally {
+ in.close()
+ out.close()
+ }
+ new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+ } finally {
+ bytes.dispose()
+ }
+ } else {
+ bytes
+ }
+
+ doPutBytes(blockId, bytesToStore, level, implicitly[ClassTag[T]], tellMaster)
}
/**