aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-07-30 16:44:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-30 16:44:02 -0700
commit1afdeb7b458f86e2641f062fb9ddc00e9c5c7531 (patch)
tree014718dff32c1fef2bf0c794aa5fc8db220e713f
parent04c8409107710fc9a625ee513d68c149745539f3 (diff)
downloadspark-1afdeb7b458f86e2641f062fb9ddc00e9c5c7531.tar.gz
spark-1afdeb7b458f86e2641f062fb9ddc00e9c5c7531.tar.bz2
spark-1afdeb7b458f86e2641f062fb9ddc00e9c5c7531.zip
[STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to not throw weird errors when Kinesis tests are enabled without AWS keys
If Kinesis tests are enabled by env ENABLE_KINESIS_TESTS = 1 but no AWS credentials are found, the desired behavior is the fail the test using with ``` Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDSuite *** ABORTED *** (3 seconds, 5 milliseconds) [info] java.lang.Exception: Kinesis tests enabled, but could get not AWS credentials ``` Instead KinesisStreamSuite fails with ``` [info] - basic operation *** FAILED *** (3 seconds, 35 milliseconds) [info] java.lang.IllegalArgumentException: requirement failed: Stream not yet created, call createStream() to create one [info] at scala.Predef$.require(Predef.scala:233) [info] at org.apache.spark.streaming.kinesis.KinesisTestUtils.streamName(KinesisTestUtils.scala:77) [info] at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150) [info] at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150) [info] at org.apache.spark.Logging$class.logWarning(Logging.scala:71) [info] at org.apache.spark.streaming.kinesis.KinesisTestUtils.logWarning(KinesisTestUtils.scala:39) [info] at org.apache.spark.streaming.kinesis.KinesisTestUtils.deleteStream(KinesisTestUtils.scala:150) [info] at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply$mcV$sp(KinesisStreamSuite.scala:111) [info] at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86) [info] at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86) ``` This is because attempting to delete a non-existent Kinesis stream throws uncaught exception. This PR fixes it. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7809 from tdas/kinesis-test-hotfix and squashes the following commits: 7c372e6 [Tathagata Das] Fixed test
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala27
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala4
2 files changed, 16 insertions, 15 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ff1b7ed0f..ca39358b75 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -53,6 +53,8 @@ private class KinesisTestUtils(
@volatile
private var streamCreated = false
+
+ @volatile
private var _streamName: String = _
private lazy val kinesisClient = {
@@ -115,21 +117,9 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}
- def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = {
- try {
- val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
- val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
- Some(desc)
- } catch {
- case rnfe: ResourceNotFoundException =>
- None
- }
- }
-
def deleteStream(): Unit = {
try {
- if (describeStream().nonEmpty) {
- val deleteStreamRequest = new DeleteStreamRequest()
+ if (streamCreated) {
kinesisClient.deleteStream(streamName)
}
} catch {
@@ -149,6 +139,17 @@ private class KinesisTestUtils(
}
}
+ private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
+ try {
+ val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+ val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+ Some(desc)
+ } catch {
+ case rnfe: ResourceNotFoundException =>
+ None
+ }
+ }
+
private def findNonExistentStreamName(): String = {
var testStreamName: String = null
do {
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index f9c952b946..b88c9c6478 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -88,11 +88,11 @@ class KinesisStreamSuite extends KinesisFunSuite
try {
kinesisTestUtils.createStream()
ssc = new StreamingContext(sc, Seconds(1))
- val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
- aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>