aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-07-23 20:06:54 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-23 20:06:54 -0700
commitd249636e59fabd8ca57a47dc2cbad9c4a4e7a750 (patch)
treeb26062dec22c2c6156993d4398e427ee62b74f55 /extras/kinesis-asl/src/test
parent52de3acca4ce8c36fd4c9ce162473a091701bbc7 (diff)
downloadspark-d249636e59fabd8ca57a47dc2cbad9c4a4e7a750.tar.gz
spark-d249636e59fabd8ca57a47dc2cbad9c4a4e7a750.tar.bz2
spark-d249636e59fabd8ca57a47dc2cbad9c4a4e7a750.zip
[SPARK-9216] [STREAMING] Define KinesisBackedBlockRDDs
For more information see master JIRA: https://issues.apache.org/jira/browse/SPARK-9215 Design Doc: https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7578 from tdas/kinesis-rdd and squashes the following commits: 543d208 [Tathagata Das] Fixed scala style 5082a30 [Tathagata Das] Fixed scala style 3f40c2d [Tathagata Das] Addressed comments c4f25d2 [Tathagata Das] Addressed comment d3d64d1 [Tathagata Das] Minor update f6e35c8 [Tathagata Das] Added retry logic to make it more robust 8874b70 [Tathagata Das] Updated Kinesis RDD 575bdbc [Tathagata Das] Fix scala style issues 4a36096 [Tathagata Das] Add license 5da3995 [Tathagata Das] Changed KinesisSuiteHelper to KinesisFunSuite 528e206 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into kinesis-rdd 3ae0814 [Tathagata Das] Added KinesisBackedBlockRDD
Diffstat (limited to 'extras/kinesis-asl/src/test')
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala246
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala13
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala4
3 files changed, 259 insertions, 4 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
new file mode 100644
index 0000000000..b2e2a4246d
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+
+class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
+
+ private val regionId = "us-east-1"
+ private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
+ private val testData = 1 to 8
+
+ private var testUtils: KinesisTestUtils = null
+ private var shardIds: Seq[String] = null
+ private var shardIdToData: Map[String, Seq[Int]] = null
+ private var shardIdToSeqNumbers: Map[String, Seq[String]] = null
+ private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null
+ private var shardIdToRange: Map[String, SequenceNumberRange] = null
+ private var allRanges: Seq[SequenceNumberRange] = null
+
+ private var sc: SparkContext = null
+ private var blockManager: BlockManager = null
+
+
+ override def beforeAll(): Unit = {
+ runIfTestsEnabled("Prepare KinesisTestUtils") {
+ testUtils = new KinesisTestUtils(endpointUrl)
+ testUtils.createStream()
+
+ shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
+ require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
+
+ shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
+ shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
+ shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
+ shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
+ val seqNumRange = SequenceNumberRange(
+ testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
+ (shardId, seqNumRange)
+ }
+ allRanges = shardIdToRange.values.toSeq
+
+ val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
+ sc = new SparkContext(conf)
+ blockManager = sc.env.blockManager
+ }
+ }
+
+ override def afterAll(): Unit = {
+ if (sc != null) {
+ sc.stop()
+ }
+ }
+
+ testIfEnabled("Basic reading from Kinesis") {
+ // Verify all data using multiple ranges in a single RDD partition
+ val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+ fakeBlockIds(1),
+ Array(SequenceNumberRanges(allRanges.toArray))
+ ).map { bytes => new String(bytes).toInt }.collect()
+ assert(receivedData1.toSet === testData.toSet)
+
+ // Verify all data using one range in each of the multiple RDD partitions
+ val receivedData2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+ fakeBlockIds(allRanges.size),
+ allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
+ ).map { bytes => new String(bytes).toInt }.collect()
+ assert(receivedData2.toSet === testData.toSet)
+
+ // Verify ordering within each partition
+ val receivedData3 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+ fakeBlockIds(allRanges.size),
+ allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
+ ).map { bytes => new String(bytes).toInt }.collectPartitions()
+ assert(receivedData3.length === allRanges.size)
+ for (i <- 0 until allRanges.size) {
+ assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
+ }
+ }
+
+ testIfEnabled("Read data available in both block manager and Kinesis") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2)
+ }
+
+ testIfEnabled("Read data available only in block manager, not in Kinesis") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0)
+ }
+
+ testIfEnabled("Read data available only in Kinesis, not in block manager") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2)
+ }
+
+ testIfEnabled("Read data available partially in block manager, rest in Kinesis") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1)
+ }
+
+ testIfEnabled("Test isBlockValid skips block fetching from block manager") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0,
+ testIsBlockValid = true)
+ }
+
+ testIfEnabled("Test whether RDD is valid after removing blocks from block anager") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2,
+ testBlockRemove = true)
+ }
+
+ /**
+ * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
+ * and the rest to a write ahead log, and then reading reading it all back using the RDD.
+ * It can also test if the partitions that were read from the log were again stored in
+ * block manager.
+ *
+ *
+ *
+ * @param numPartitions Number of partitions in RDD
+ * @param numPartitionsInBM Number of partitions to write to the BlockManager.
+ * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
+ * @param numPartitionsInKinesis Number of partitions to write to the Kinesis.
+ * Partitions (numPartitions - 1 - numPartitionsInKinesis) to
+ * (numPartitions - 1) will be written to Kinesis
+ * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
+ * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
+ * reads falling back to the WAL
+ * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
+ *
+ * numPartitionsInBM = 3
+ * |------------------|
+ * | |
+ * 0 1 2 3 4
+ * | |
+ * |-------------------------|
+ * numPartitionsInKinesis = 4
+ */
+ private def testRDD(
+ numPartitions: Int,
+ numPartitionsInBM: Int,
+ numPartitionsInKinesis: Int,
+ testIsBlockValid: Boolean = false,
+ testBlockRemove: Boolean = false
+ ): Unit = {
+ require(shardIds.size > 1, "Need at least 2 shards to test")
+ require(numPartitionsInBM <= shardIds.size ,
+ "Number of partitions in BlockManager cannot be more than the Kinesis test shards available")
+ require(numPartitionsInKinesis <= shardIds.size ,
+ "Number of partitions in Kinesis cannot be more than the Kinesis test shards available")
+ require(numPartitionsInBM <= numPartitions,
+ "Number of partitions in BlockManager cannot be more than that in RDD")
+ require(numPartitionsInKinesis <= numPartitions,
+ "Number of partitions in Kinesis cannot be more than that in RDD")
+
+ // Put necessary blocks in the block manager
+ val blockIds = fakeBlockIds(numPartitions)
+ blockIds.foreach(blockManager.removeBlock(_))
+ (0 until numPartitionsInBM).foreach { i =>
+ val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() }
+ blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY)
+ }
+
+ // Create the necessary ranges to use in the RDD
+ val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
+ SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
+ val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
+ val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
+ SequenceNumberRanges(Array(range))
+ }
+ val ranges = (fakeRanges ++ realRanges)
+
+
+ // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
+ require(
+ blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
+ "Expected blocks not in BlockManager"
+ )
+
+ require(
+ blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty),
+ "Unexpected blocks in BlockManager"
+ )
+
+ // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not
+ require(
+ ranges.takeRight(numPartitionsInKinesis).forall {
+ _.ranges.forall { _.streamName == testUtils.streamName }
+ }, "Incorrect configuration of RDD, expected ranges not set: "
+ )
+
+ require(
+ ranges.dropRight(numPartitionsInKinesis).forall {
+ _.ranges.forall { _.streamName != testUtils.streamName }
+ }, "Incorrect configuration of RDD, unexpected ranges set"
+ )
+
+ val rdd = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds, ranges)
+ val collectedData = rdd.map { bytes =>
+ new String(bytes).toInt
+ }.collect()
+ assert(collectedData.toSet === testData.toSet)
+
+ // Verify that the block fetching is skipped when isBlockValid is set to false.
+ // This is done by using a RDD whose data is only in memory but is set to skip block fetching
+ // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
+ // in BlockManager.
+ if (testIsBlockValid) {
+ require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
+ require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
+ val rdd2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds.toArray,
+ ranges, isBlockIdValid = Array.fill(blockIds.length)(false))
+ intercept[SparkException] {
+ rdd2.collect()
+ }
+ }
+
+ // Verify that the RDD is not invalid after the blocks are removed and can still read data
+ // from write ahead log
+ if (testBlockRemove) {
+ require(numPartitions === numPartitionsInKinesis,
+ "All partitions must be in WAL for this test")
+ require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
+ rdd.removeBlocks()
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet)
+ }
+ }
+
+ /** Generate fake block ids */
+ private def fakeBlockIds(num: Int): Array[BlockId] = {
+ Array.tabulate(num) { i => new StreamBlockId(0, i) }
+ }
+}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
index 6d011f295e..8373138785 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -23,15 +23,24 @@ import org.apache.spark.SparkFunSuite
* Helper class that runs Kinesis real data transfer tests or
* ignores them based on env variable is set or not.
*/
-trait KinesisSuiteHelper { self: SparkFunSuite =>
+trait KinesisFunSuite extends SparkFunSuite {
import KinesisTestUtils._
/** Run the test if environment variable is set or ignore the test */
- def testOrIgnore(testName: String)(testBody: => Unit) {
+ def testIfEnabled(testName: String)(testBody: => Unit) {
if (shouldRunTests) {
test(testName)(testBody)
} else {
ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
}
}
+
+ /** Run the give body of code only if Kinesis tests are enabled */
+ def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
+ if (shouldRunTests) {
+ body
+ } else {
+ ignore(s"$message [enable by setting env var $envVarName=1]")()
+ }
+ }
}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 50f71413ab..f9c952b946 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
+class KinesisStreamSuite extends KinesisFunSuite
with Eventually with BeforeAndAfter with BeforeAndAfterAll {
// This is the name that KCL uses to save metadata to DynamoDB
@@ -83,7 +83,7 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
* you must have AWS credentials available through the default AWS provider chain,
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
- testOrIgnore("basic operation") {
+ testIfEnabled("basic operation") {
val kinesisTestUtils = new KinesisTestUtils()
try {
kinesisTestUtils.createStream()