aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-07-19 20:34:30 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-19 20:34:30 -0700
commit93eb2acfb287807355ba5d77989d239fdd6e2c30 (patch)
treefcd881495be2c80b08fbd084ffe23ece235de679
parent163e3f1df94f6b7d3dadb46a87dbb3a2bade3f95 (diff)
downloadspark-93eb2acfb287807355ba5d77989d239fdd6e2c30.tar.gz
spark-93eb2acfb287807355ba5d77989d239fdd6e2c30.tar.bz2
spark-93eb2acfb287807355ba5d77989d239fdd6e2c30.zip
[SPARK-9030] [STREAMING] [HOTFIX] Make sure that no attempts to create Kinesis streams is made when no enabled
Problem: Even when the environment variable to enable tests are disabled, the `beforeAll` of the KinesisStreamSuite attempted to find AWS credentials to create Kinesis stream, and failed. Solution: Made sure all accesses to KinesisTestUtils, that created streams, is under `testOrIgnore` Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7519 from tdas/kinesis-tests and squashes the following commits: 64d6d06 [Tathagata Das] Removed empty lines. 7c18473 [Tathagata Das] Putting all access to KinesisTestUtils inside testOrIgnore
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala57
1 files changed, 26 insertions, 31 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index d3dd541fe4..50f71413ab 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -33,8 +33,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
with Eventually with BeforeAndAfter with BeforeAndAfterAll {
- private val kinesisTestUtils = new KinesisTestUtils()
-
// This is the name that KCL uses to save metadata to DynamoDB
private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
@@ -42,7 +40,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
private var sc: SparkContext = _
override def beforeAll(): Unit = {
- kinesisTestUtils.createStream()
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
@@ -53,15 +50,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
sc.stop()
// Delete the Kinesis stream as well as the DynamoDB table generated by
// Kinesis Client Library when consuming the stream
- kinesisTestUtils.deleteStream()
- kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
- }
-
- before {
- // Delete the DynamoDB table generated by Kinesis Client Library when
- // consuming from the stream, so that each unit test can start from
- // scratch without prior history of data consumption
- kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
}
after {
@@ -96,25 +84,32 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
testOrIgnore("basic operation") {
- ssc = new StreamingContext(sc, Seconds(1))
- val aWSCredentials = KinesisTestUtils.getAWSCredentials()
- val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
- kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
- Seconds(10), StorageLevel.MEMORY_ONLY,
- aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
-
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
- stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
- }
- ssc.start()
-
- val testData = 1 to 10
- eventually(timeout(120 seconds), interval(10 second)) {
- kinesisTestUtils.pushData(testData)
- assert(collected === testData.toSet, "\nData received does not match data sent")
+ val kinesisTestUtils = new KinesisTestUtils()
+ try {
+ kinesisTestUtils.createStream()
+ ssc = new StreamingContext(sc, Seconds(1))
+ val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+ val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
+ kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+
+ val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+ collected ++= rdd.collect()
+ logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+ }
+ ssc.start()
+
+ val testData = 1 to 10
+ eventually(timeout(120 seconds), interval(10 second)) {
+ kinesisTestUtils.pushData(testData)
+ assert(collected === testData.toSet, "\nData received does not match data sent")
+ }
+ ssc.stop()
+ } finally {
+ kinesisTestUtils.deleteStream()
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
}
- ssc.stop()
}
}