diff options
Diffstat (limited to 'streaming/src/test/scala')
2 files changed, 56 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index f224193600..c2b0389b8c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -32,10 +32,12 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ @@ -44,7 +46,7 @@ import org.apache.spark.streaming.util._ import org.apache.spark.util.{ManualClock, Utils} import org.apache.spark.util.io.ChunkedByteBuffer -class ReceivedBlockHandlerSuite +abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) extends SparkFunSuite with BeforeAndAfter with Matchers @@ -57,14 +59,22 @@ class ReceivedBlockHandlerSuite val conf = new SparkConf() .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") .set("spark.app.id", "streaming-test") + .set(IO_ENCRYPTION_ENABLED, enableEncryption) + val encryptionKey = + if (enableEncryption) { + Some(CryptoStreamUtils.createKey(conf)) + } else { + None + } + val hadoopConf = new Configuration() val streamId = 1 - val securityMgr = new SecurityManager(conf) + val securityMgr = new SecurityManager(conf, encryptionKey) val broadcastManager = new BroadcastManager(true, conf, securityMgr) val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) val shuffleManager = new SortShuffleManager(conf) val serializer = new KryoSerializer(conf) - var serializerManager = new SerializerManager(serializer, conf) + var serializerManager = new SerializerManager(serializer, conf, encryptionKey) val manualClock = new ManualClock val blockManagerSize = 10000000 val blockManagerBuffer = new ArrayBuffer[BlockManager]() @@ -164,7 +174,9 @@ class ReceivedBlockHandlerSuite val bytes = reader.read(fileSegment) reader.close() serializerManager.dataDeserializeStream( - generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList + generateBlockId(), + new ChunkedByteBuffer(bytes).toInputStream(), + maybeEncrypted = false)(ClassTag.Any).toList } loggedData shouldEqual data } @@ -208,6 +220,8 @@ class ReceivedBlockHandlerSuite sparkConf.set("spark.storage.unrollMemoryThreshold", "512") // spark.storage.unrollFraction set to 0.4 for BlockManager sparkConf.set("spark.storage.unrollFraction", "0.4") + + sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption) // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll blockManager = createBlockManager(12000, sparkConf) @@ -343,7 +357,7 @@ class ReceivedBlockHandlerSuite } def dataToByteBuffer(b: Seq[String]) = - serializerManager.dataSerialize(generateBlockId, b.iterator) + serializerManager.dataSerialize(generateBlockId, b.iterator, allowEncryption = false) val blocks = data.grouped(10).toSeq @@ -418,3 +432,6 @@ class ReceivedBlockHandlerSuite private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) } +class ReceivedBlockHandlerSuite extends BaseReceivedBlockHandlerSuite(false) + +class ReceivedBlockHandlerWithEncryptionSuite extends BaseReceivedBlockHandlerSuite(true) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index c5e695a33a..2ac0dc9691 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config._ import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter} @@ -45,6 +46,7 @@ class WriteAheadLogBackedBlockRDDSuite override def beforeEach(): Unit = { super.beforeEach() + initSparkContext() dir = Utils.createTempDir() } @@ -56,22 +58,33 @@ class WriteAheadLogBackedBlockRDDSuite } } - override def beforeAll(): Unit = { - super.beforeAll() - sparkContext = new SparkContext(conf) - blockManager = sparkContext.env.blockManager - serializerManager = sparkContext.env.serializerManager + override def afterAll(): Unit = { + try { + stopSparkContext() + } finally { + super.afterAll() + } } - override def afterAll(): Unit = { + private def initSparkContext(_conf: Option[SparkConf] = None): Unit = { + if (sparkContext == null) { + sparkContext = new SparkContext(_conf.getOrElse(conf)) + blockManager = sparkContext.env.blockManager + serializerManager = sparkContext.env.serializerManager + } + } + + private def stopSparkContext(): Unit = { // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. try { - sparkContext.stop() + if (sparkContext != null) { + sparkContext.stop() + } System.clearProperty("spark.driver.port") blockManager = null serializerManager = null } finally { - super.afterAll() + sparkContext = null } } @@ -106,6 +119,17 @@ class WriteAheadLogBackedBlockRDDSuite numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true) } + test("read data in block manager and WAL with encryption on") { + stopSparkContext() + try { + val testConf = conf.clone().set(IO_ENCRYPTION_ENABLED, true) + initSparkContext(Some(testConf)) + testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2) + } finally { + stopSparkContext() + } + } + /** * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager * and the rest to a write ahead log, and then reading it all back using the RDD. @@ -226,7 +250,8 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer) + writer.write(serializerManager.dataSerialize(id, data.iterator, allowEncryption = false) + .toByteBuffer) } writer.close() segments |