aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala7
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala4
2 files changed, 7 insertions, 4 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index b2e2a4246d..e81fb11e59 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -17,10 +17,10 @@
package org.apache.spark.streaming.kinesis
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
-import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
@@ -65,6 +65,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
}
override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.deleteStream()
+ }
if (sc != null) {
sc.stop()
}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 4992b04176..f9c952b946 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -59,7 +59,7 @@ class KinesisStreamSuite extends KinesisFunSuite
}
}
- ignore("KinesisUtils API") {
+ test("KinesisUtils API") {
ssc = new StreamingContext(sc, Seconds(1))
// Tests the API, does not actually test data receiving
val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
@@ -83,7 +83,7 @@ class KinesisStreamSuite extends KinesisFunSuite
* you must have AWS credentials available through the default AWS provider chain,
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
- ignore("basic operation") {
+ testIfEnabled("basic operation") {
val kinesisTestUtils = new KinesisTestUtils()
try {
kinesisTestUtils.createStream()