diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-19 20:34:30 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-19 20:34:30 -0700 |
commit | 93eb2acfb287807355ba5d77989d239fdd6e2c30 (patch) | |
tree | fcd881495be2c80b08fbd084ffe23ece235de679 | |
parent | 163e3f1df94f6b7d3dadb46a87dbb3a2bade3f95 (diff) | |
download | spark-93eb2acfb287807355ba5d77989d239fdd6e2c30.tar.gz spark-93eb2acfb287807355ba5d77989d239fdd6e2c30.tar.bz2 spark-93eb2acfb287807355ba5d77989d239fdd6e2c30.zip |
[SPARK-9030] [STREAMING] [HOTFIX] Make sure that no attempts to create Kinesis streams is made when no enabled
Problem: Even when the environment variable to enable tests are disabled, the `beforeAll` of the KinesisStreamSuite attempted to find AWS credentials to create Kinesis stream, and failed.
Solution: Made sure all accesses to KinesisTestUtils, that created streams, is under `testOrIgnore`
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #7519 from tdas/kinesis-tests and squashes the following commits:
64d6d06 [Tathagata Das] Removed empty lines.
7c18473 [Tathagata Das] Putting all access to KinesisTestUtils inside testOrIgnore
-rw-r--r-- | extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 57 |
1 files changed, 26 insertions, 31 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index d3dd541fe4..50f71413ab 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -33,8 +33,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper with Eventually with BeforeAndAfter with BeforeAndAfterAll { - private val kinesisTestUtils = new KinesisTestUtils() - // This is the name that KCL uses to save metadata to DynamoDB private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" @@ -42,7 +40,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper private var sc: SparkContext = _ override def beforeAll(): Unit = { - kinesisTestUtils.createStream() val conf = new SparkConf() .setMaster("local[4]") .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name @@ -53,15 +50,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper sc.stop() // Delete the Kinesis stream as well as the DynamoDB table generated by // Kinesis Client Library when consuming the stream - kinesisTestUtils.deleteStream() - kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) - } - - before { - // Delete the DynamoDB table generated by Kinesis Client Library when - // consuming from the stream, so that each unit test can start from - // scratch without prior history of data consumption - kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } after { @@ -96,25 +84,32 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . */ testOrIgnore("basic operation") { - ssc = new StreamingContext(sc, Seconds(1)) - val aWSCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, - kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey) - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - kinesisTestUtils.pushData(testData) - assert(collected === testData.toSet, "\nData received does not match data sent") + val kinesisTestUtils = new KinesisTestUtils() + try { + kinesisTestUtils.createStream() + ssc = new StreamingContext(sc, Seconds(1)) + val aWSCredentials = KinesisTestUtils.getAWSCredentials() + val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, + kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey) + + val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => + collected ++= rdd.collect() + logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) + } + ssc.start() + + val testData = 1 to 10 + eventually(timeout(120 seconds), interval(10 second)) { + kinesisTestUtils.pushData(testData) + assert(collected === testData.toSet, "\nData received does not match data sent") + } + ssc.stop() + } finally { + kinesisTestUtils.deleteStream() + kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } - ssc.stop() } } |