aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala11
2 files changed, 13 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 0b2ec29813..d0864fd367 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
/**
@@ -158,13 +158,16 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
- blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
+ blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel,
+ encrypt = true)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
serializerManager
.dataDeserializeStream(
- blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag)
+ blockId,
+ new ChunkedByteBuffer(dataRead).toInputStream(),
+ maybeEncrypted = false)(elementClassTag)
.asInstanceOf[Iterator[T]]
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 80c07958b4..2b488038f0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -87,7 +87,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(
- blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)
+ blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true,
+ encrypt = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
@@ -175,10 +176,11 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
- serializerManager.dataSerialize(blockId, arrayBuffer.iterator)
+ serializerManager.dataSerialize(blockId, arrayBuffer.iterator, allowEncryption = false)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
- val serializedBlock = serializerManager.dataSerialize(blockId, countIterator)
+ val serializedBlock = serializerManager.dataSerialize(blockId, countIterator,
+ allowEncryption = false)
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
@@ -193,7 +195,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
blockId,
serializedBlock,
effectiveStorageLevel,
- tellMaster = true)
+ tellMaster = true,
+ encrypt = true)
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")