diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2017-03-29 20:27:41 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-29 20:27:41 +0800 |
commit | b56ad2b1ec19fd60fa9d4926d12244fd3f56aca4 (patch) | |
tree | da4c6117196cbcccd8f94469c7ed322aef474ca8 /streaming/src | |
parent | 9712bd3954c029de5c828f27b57d46e4a6325a38 (diff) | |
download | spark-b56ad2b1ec19fd60fa9d4926d12244fd3f56aca4.tar.gz spark-b56ad2b1ec19fd60fa9d4926d12244fd3f56aca4.tar.bz2 spark-b56ad2b1ec19fd60fa9d4926d12244fd3f56aca4.zip |
[SPARK-19556][CORE] Do not encrypt block manager data in memory.
This change modifies the way block data is encrypted to make the more
common cases faster, while penalizing an edge case. As a side effect
of the change, all data that goes through the block manager is now
encrypted only when needed, including the previous path (broadcast
variables) where that did not happen.
The way the change works is by not encrypting data that is stored in
memory; so if a serialized block is in memory, it will only be encrypted
once it is evicted to disk.
The penalty comes when transferring that encrypted data from disk. If the
data ends up in memory again, it is as efficient as before; but if the
evicted block needs to be transferred directly to a remote executor, then
there's now a performance penalty, since the code now uses a custom
FileRegion implementation to decrypt the data before transferring.
This also means that block data transferred between executors now is
not encrypted (and thus relies on the network library encryption support
for secrecy). Shuffle blocks are still transferred in encrypted form,
since they're handled in a slightly different way by the code. This also
keeps compatibility with existing external shuffle services, which transfer
encrypted shuffle blocks, and avoids having to make the external service
aware of encryption at all.
The serialization and deserialization APIs in the SerializerManager now
do not do encryption automatically; callers need to explicitly wrap their
streams with an appropriate crypto stream before using those.
As a result of these changes, some of the workarounds added in SPARK-19520
are removed here.
Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
way to run a test twice, with encryption on and off; broadcast, block manager
and caching tests were modified to use this new trait so that the existing
tests exercise both encrypted and non-encrypted paths. I also ran some
applications with encryption turned on to verify that they still work,
including streaming tests that failed without the fix for SPARK-19520.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #17295 from vanzin/SPARK-19556.
Diffstat (limited to 'streaming/src')
4 files changed, 9 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index d0864fd367..844760ab61 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -158,16 +158,14 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logInfo(s"Read partition data of $this from write ahead log, record handle " + partition.walRecordHandle) if (storeInBlockManager) { - blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel, - encrypt = true) + blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } serializerManager .dataDeserializeStream( blockId, - new ChunkedByteBuffer(dataRead).toInputStream(), - maybeEncrypted = false)(elementClassTag) + new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag) .asInstanceOf[Iterator[T]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 2b488038f0..80c07958b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -87,8 +87,7 @@ private[streaming] class BlockManagerBasedBlockHandler( putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes( - blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true, - encrypt = true) + blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") @@ -176,11 +175,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) - serializerManager.dataSerialize(blockId, arrayBuffer.iterator, allowEncryption = false) + serializerManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) - val serializedBlock = serializerManager.dataSerialize(blockId, countIterator, - allowEncryption = false) + val serializedBlock = serializerManager.dataSerialize(blockId, countIterator) numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => @@ -195,8 +193,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( blockId, serializedBlock, effectiveStorageLevel, - tellMaster = true, - encrypt = true) + tellMaster = true) if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index c2b0389b8c..3c4a2716ca 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -175,8 +175,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) reader.close() serializerManager.dataDeserializeStream( generateBlockId(), - new ChunkedByteBuffer(bytes).toInputStream(), - maybeEncrypted = false)(ClassTag.Any).toList + new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList } loggedData shouldEqual data } @@ -357,7 +356,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) } def dataToByteBuffer(b: Seq[String]) = - serializerManager.dataSerialize(generateBlockId, b.iterator, allowEncryption = false) + serializerManager.dataSerialize(generateBlockId, b.iterator) val blocks = data.grouped(10).toSeq diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 2ac0dc9691..aa69be7ca9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -250,8 +250,7 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(serializerManager.dataSerialize(id, data.iterator, allowEncryption = false) - .toByteBuffer) + writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer) } writer.close() segments |