diff options
Diffstat (limited to 'extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala')
-rw-r--r-- | extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala | 259 |
1 files changed, 0 insertions, 259 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala deleted file mode 100644 index 2555332d22..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException} -import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} - -abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) - extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext { - - private val testData = 1 to 8 - - private var testUtils: KinesisTestUtils = null - private var shardIds: Seq[String] = null - private var shardIdToData: Map[String, Seq[Int]] = null - private var shardIdToSeqNumbers: Map[String, Seq[String]] = null - private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null - private var shardIdToRange: Map[String, SequenceNumberRange] = null - private var allRanges: Seq[SequenceNumberRange] = null - - private var blockManager: BlockManager = null - - override def beforeAll(): Unit = { - super.beforeAll() - runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KPLBasedKinesisTestUtils() - testUtils.createStream() - - shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData) - require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") - - shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} - shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => - val seqNumRange = SequenceNumberRange( - testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) - (shardId, seqNumRange) - } - allRanges = shardIdToRange.values.toSeq - } - } - - override def beforeEach(): Unit = { - super.beforeEach() - val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") - sc = new SparkContext(conf) - blockManager = sc.env.blockManager - } - - override def afterAll(): Unit = { - try { - if (testUtils != null) { - testUtils.deleteStream() - } - } finally { - super.afterAll() - } - } - - testIfEnabled("Basic reading from Kinesis") { - // Verify all data using multiple ranges in a single RDD partition - val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(1), - Array(SequenceNumberRanges(allRanges.toArray)) - ).map { bytes => new String(bytes).toInt }.collect() - assert(receivedData1.toSet === testData.toSet) - - // Verify all data using one range in each of the multiple RDD partitions - val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(allRanges.size), - allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray - ).map { bytes => new String(bytes).toInt }.collect() - assert(receivedData2.toSet === testData.toSet) - - // Verify ordering within each partition - val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(allRanges.size), - allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray - ).map { bytes => new String(bytes).toInt }.collectPartitions() - assert(receivedData3.length === allRanges.size) - for (i <- 0 until allRanges.size) { - assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId)) - } - } - - testIfEnabled("Read data available in both block manager and Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2) - } - - testIfEnabled("Read data available only in block manager, not in Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0) - } - - testIfEnabled("Read data available only in Kinesis, not in block manager") { - testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2) - } - - testIfEnabled("Read data available partially in block manager, rest in Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1) - } - - testIfEnabled("Test isBlockValid skips block fetching from block manager") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0, - testIsBlockValid = true) - } - - testIfEnabled("Test whether RDD is valid after removing blocks from block anager") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2, - testBlockRemove = true) - } - - /** - * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager - * and the rest to a write ahead log, and then reading reading it all back using the RDD. - * It can also test if the partitions that were read from the log were again stored in - * block manager. - * - * - * - * @param numPartitions Number of partitions in RDD - * @param numPartitionsInBM Number of partitions to write to the BlockManager. - * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager - * @param numPartitionsInKinesis Number of partitions to write to the Kinesis. - * Partitions (numPartitions - 1 - numPartitionsInKinesis) to - * (numPartitions - 1) will be written to Kinesis - * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching - * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with - * reads falling back to the WAL - * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4 - * - * numPartitionsInBM = 3 - * |------------------| - * | | - * 0 1 2 3 4 - * | | - * |-------------------------| - * numPartitionsInKinesis = 4 - */ - private def testRDD( - numPartitions: Int, - numPartitionsInBM: Int, - numPartitionsInKinesis: Int, - testIsBlockValid: Boolean = false, - testBlockRemove: Boolean = false - ): Unit = { - require(shardIds.size > 1, "Need at least 2 shards to test") - require(numPartitionsInBM <= shardIds.size, - "Number of partitions in BlockManager cannot be more than the Kinesis test shards available") - require(numPartitionsInKinesis <= shardIds.size, - "Number of partitions in Kinesis cannot be more than the Kinesis test shards available") - require(numPartitionsInBM <= numPartitions, - "Number of partitions in BlockManager cannot be more than that in RDD") - require(numPartitionsInKinesis <= numPartitions, - "Number of partitions in Kinesis cannot be more than that in RDD") - - // Put necessary blocks in the block manager - val blockIds = fakeBlockIds(numPartitions) - blockIds.foreach(blockManager.removeBlock(_)) - (0 until numPartitionsInBM).foreach { i => - val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() } - blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY) - } - - // Create the necessary ranges to use in the RDD - val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)( - SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))) - val realRanges = Array.tabulate(numPartitionsInKinesis) { i => - val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis))) - SequenceNumberRanges(Array(range)) - } - val ranges = (fakeRanges ++ realRanges) - - - // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not - require( - blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), - "Expected blocks not in BlockManager" - ) - - require( - blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty), - "Unexpected blocks in BlockManager" - ) - - // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not - require( - ranges.takeRight(numPartitionsInKinesis).forall { - _.ranges.forall { _.streamName == testUtils.streamName } - }, "Incorrect configuration of RDD, expected ranges not set: " - ) - - require( - ranges.dropRight(numPartitionsInKinesis).forall { - _.ranges.forall { _.streamName != testUtils.streamName } - }, "Incorrect configuration of RDD, unexpected ranges set" - ) - - val rdd = new KinesisBackedBlockRDD[Array[Byte]]( - sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges) - val collectedData = rdd.map { bytes => - new String(bytes).toInt - }.collect() - assert(collectedData.toSet === testData.toSet) - - // Verify that the block fetching is skipped when isBlockValid is set to false. - // This is done by using a RDD whose data is only in memory but is set to skip block fetching - // Using that RDD will throw exception, as it skips block fetching even if the blocks are in - // in BlockManager. - if (testIsBlockValid) { - require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") - require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis") - val rdd2 = new KinesisBackedBlockRDD[Array[Byte]]( - sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges, - isBlockIdValid = Array.fill(blockIds.length)(false)) - intercept[SparkException] { - rdd2.collect() - } - } - - // Verify that the RDD is not invalid after the blocks are removed and can still read data - // from write ahead log - if (testBlockRemove) { - require(numPartitions === numPartitionsInKinesis, - "All partitions must be in WAL for this test") - require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") - rdd.removeBlocks() - assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet) - } - } - - /** Generate fake block ids */ - private def fakeBlockIds(num: Int): Array[BlockId] = { - Array.tabulate(num) { i => new StreamBlockId(0, i) } - } -} - -class WithAggregationKinesisBackedBlockRDDSuite - extends KinesisBackedBlockRDDTests(aggregateTestData = true) - -class WithoutAggregationKinesisBackedBlockRDDSuite - extends KinesisBackedBlockRDDTests(aggregateTestData = false) |