diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2017-02-13 14:19:41 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2017-02-13 14:19:41 -0800 |
commit | 0169360ef58891ca10a8d64d1c8637c7b873cbdd (patch) | |
tree | 8a0e7b1652c7d32bda363ee7cbf4696a1daed608 /streaming/src | |
parent | 9af8f743b00001f9fdf8813481464c3837331ad9 (diff) | |
download | spark-0169360ef58891ca10a8d64d1c8637c7b873cbdd.tar.gz spark-0169360ef58891ca10a8d64d1c8637c7b873cbdd.tar.bz2 spark-0169360ef58891ca10a8d64d1c8637c7b873cbdd.zip |
[SPARK-19520][STREAMING] Do not encrypt data written to the WAL.
Spark's I/O encryption uses an ephemeral key for each driver instance.
So driver B cannot decrypt data written by driver A since it doesn't
have the correct key.
The write ahead log is used for recovery, thus needs to be readable by
a different driver. So it cannot be encrypted by Spark's I/O encryption
code.
The BlockManager APIs used by the WAL code to write the data automatically
encrypt data, so changes are needed so that callers can to opt out of
encryption.
Aside from that, the "putBytes" API in the BlockManager does not do
encryption, so a separate situation arised where the WAL would write
unencrypted data to the BM and, when those blocks were read, decryption
would fail. So the WAL code needs to ask the BM to encrypt that data
when encryption is enabled; this code is not optimal since it results
in a (temporary) second copy of the data block in memory, but should be
OK for now until a more performant solution is added. The non-encryption
case should not be affected.
Tested with new unit tests, and by running streaming apps that do
recovery using the WAL data with I/O encryption turned on.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #16862 from vanzin/SPARK-19520.
Diffstat (limited to 'streaming/src')
4 files changed, 69 insertions, 21 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 0b2ec29813..d0864fd367 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -27,7 +27,7 @@ import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer /** @@ -158,13 +158,16 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logInfo(s"Read partition data of $this from write ahead log, record handle " + partition.walRecordHandle) if (storeInBlockManager) { - blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel) + blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel, + encrypt = true) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } serializerManager .dataDeserializeStream( - blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag) + blockId, + new ChunkedByteBuffer(dataRead).toInputStream(), + maybeEncrypted = false)(elementClassTag) .asInstanceOf[Iterator[T]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 80c07958b4..2b488038f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -87,7 +87,8 @@ private[streaming] class BlockManagerBasedBlockHandler( putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes( - blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true) + blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true, + encrypt = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") @@ -175,10 +176,11 @@ private[streaming] class WriteAheadLogBasedBlockHandler( val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) - serializerManager.dataSerialize(blockId, arrayBuffer.iterator) + serializerManager.dataSerialize(blockId, arrayBuffer.iterator, allowEncryption = false) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) - val serializedBlock = serializerManager.dataSerialize(blockId, countIterator) + val serializedBlock = serializerManager.dataSerialize(blockId, countIterator, + allowEncryption = false) numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => @@ -193,7 +195,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( blockId, serializedBlock, effectiveStorageLevel, - tellMaster = true) + tellMaster = true, + encrypt = true) if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index f224193600..c2b0389b8c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -32,10 +32,12 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ @@ -44,7 +46,7 @@ import org.apache.spark.streaming.util._ import org.apache.spark.util.{ManualClock, Utils} import org.apache.spark.util.io.ChunkedByteBuffer -class ReceivedBlockHandlerSuite +abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) extends SparkFunSuite with BeforeAndAfter with Matchers @@ -57,14 +59,22 @@ class ReceivedBlockHandlerSuite val conf = new SparkConf() .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") .set("spark.app.id", "streaming-test") + .set(IO_ENCRYPTION_ENABLED, enableEncryption) + val encryptionKey = + if (enableEncryption) { + Some(CryptoStreamUtils.createKey(conf)) + } else { + None + } + val hadoopConf = new Configuration() val streamId = 1 - val securityMgr = new SecurityManager(conf) + val securityMgr = new SecurityManager(conf, encryptionKey) val broadcastManager = new BroadcastManager(true, conf, securityMgr) val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) val shuffleManager = new SortShuffleManager(conf) val serializer = new KryoSerializer(conf) - var serializerManager = new SerializerManager(serializer, conf) + var serializerManager = new SerializerManager(serializer, conf, encryptionKey) val manualClock = new ManualClock val blockManagerSize = 10000000 val blockManagerBuffer = new ArrayBuffer[BlockManager]() @@ -164,7 +174,9 @@ class ReceivedBlockHandlerSuite val bytes = reader.read(fileSegment) reader.close() serializerManager.dataDeserializeStream( - generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList + generateBlockId(), + new ChunkedByteBuffer(bytes).toInputStream(), + maybeEncrypted = false)(ClassTag.Any).toList } loggedData shouldEqual data } @@ -208,6 +220,8 @@ class ReceivedBlockHandlerSuite sparkConf.set("spark.storage.unrollMemoryThreshold", "512") // spark.storage.unrollFraction set to 0.4 for BlockManager sparkConf.set("spark.storage.unrollFraction", "0.4") + + sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption) // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll blockManager = createBlockManager(12000, sparkConf) @@ -343,7 +357,7 @@ class ReceivedBlockHandlerSuite } def dataToByteBuffer(b: Seq[String]) = - serializerManager.dataSerialize(generateBlockId, b.iterator) + serializerManager.dataSerialize(generateBlockId, b.iterator, allowEncryption = false) val blocks = data.grouped(10).toSeq @@ -418,3 +432,6 @@ class ReceivedBlockHandlerSuite private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) } +class ReceivedBlockHandlerSuite extends BaseReceivedBlockHandlerSuite(false) + +class ReceivedBlockHandlerWithEncryptionSuite extends BaseReceivedBlockHandlerSuite(true) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index c5e695a33a..2ac0dc9691 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config._ import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter} @@ -45,6 +46,7 @@ class WriteAheadLogBackedBlockRDDSuite override def beforeEach(): Unit = { super.beforeEach() + initSparkContext() dir = Utils.createTempDir() } @@ -56,22 +58,33 @@ class WriteAheadLogBackedBlockRDDSuite } } - override def beforeAll(): Unit = { - super.beforeAll() - sparkContext = new SparkContext(conf) - blockManager = sparkContext.env.blockManager - serializerManager = sparkContext.env.serializerManager + override def afterAll(): Unit = { + try { + stopSparkContext() + } finally { + super.afterAll() + } } - override def afterAll(): Unit = { + private def initSparkContext(_conf: Option[SparkConf] = None): Unit = { + if (sparkContext == null) { + sparkContext = new SparkContext(_conf.getOrElse(conf)) + blockManager = sparkContext.env.blockManager + serializerManager = sparkContext.env.serializerManager + } + } + + private def stopSparkContext(): Unit = { // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. try { - sparkContext.stop() + if (sparkContext != null) { + sparkContext.stop() + } System.clearProperty("spark.driver.port") blockManager = null serializerManager = null } finally { - super.afterAll() + sparkContext = null } } @@ -106,6 +119,17 @@ class WriteAheadLogBackedBlockRDDSuite numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true) } + test("read data in block manager and WAL with encryption on") { + stopSparkContext() + try { + val testConf = conf.clone().set(IO_ENCRYPTION_ENABLED, true) + initSparkContext(Some(testConf)) + testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2) + } finally { + stopSparkContext() + } + } + /** * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager * and the rest to a write ahead log, and then reading it all back using the RDD. @@ -226,7 +250,8 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer) + writer.write(serializerManager.dataSerialize(id, data.iterator, allowEncryption = false) + .toByteBuffer) } writer.close() segments |