aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/test/scala/org/apache/spark
diff options
context:
space:
mode:
Diffstat (limited to 'extras/kinesis-asl/src/test/scala/org/apache/spark')
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala37
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala17
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala120
3 files changed, 157 insertions, 17 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
new file mode 100644
index 0000000000..6d011f295e
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Kinesis real data transfer tests or
+ * ignores them based on env variable is set or not.
+ */
+trait KinesisSuiteHelper { self: SparkFunSuite =>
+ import KinesisTestUtils._
+
+ /** Run the test if environment variable is set or ignore the test */
+ def testOrIgnore(testName: String)(testBody: => Unit) {
+ if (shouldRunTests) {
+ test(testName)(testBody)
+ } else {
+ ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
+ }
+ }
+}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 2103dca6b7..98f2c7c4f1 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -73,23 +73,6 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
checkpointStateMock, currentClockMock)
}
- test("KinesisUtils API") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- // Tests the API, does not actually test data receiving
- val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
- InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
- InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
- "awsAccessKey", "awsSecretKey")
-
- ssc.stop()
- }
-
test("check serializability of SerializableAWSCredentials") {
Utils.deserialize[SerializableAWSCredentials](
Utils.serialize(new SerializableAWSCredentials("x", "y")))
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
new file mode 100644
index 0000000000..d3dd541fe4
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
+ with Eventually with BeforeAndAfter with BeforeAndAfterAll {
+
+ private val kinesisTestUtils = new KinesisTestUtils()
+
+ // This is the name that KCL uses to save metadata to DynamoDB
+ private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+
+ private var ssc: StreamingContext = _
+ private var sc: SparkContext = _
+
+ override def beforeAll(): Unit = {
+ kinesisTestUtils.createStream()
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll(): Unit = {
+ sc.stop()
+ // Delete the Kinesis stream as well as the DynamoDB table generated by
+ // Kinesis Client Library when consuming the stream
+ kinesisTestUtils.deleteStream()
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+ }
+
+ before {
+ // Delete the DynamoDB table generated by Kinesis Client Library when
+ // consuming from the stream, so that each unit test can start from
+ // scratch without prior history of data consumption
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop(stopSparkContext = false)
+ ssc = null
+ }
+ }
+
+ test("KinesisUtils API") {
+ ssc = new StreamingContext(sc, Seconds(1))
+ // Tests the API, does not actually test data receiving
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+ }
+
+
+ /**
+ * Test the stream by sending data to a Kinesis stream and receiving from it.
+ * This test is not run by default as it requires AWS credentials that the test
+ * environment may not have. Even if there is AWS credentials available, the user
+ * may not want to run these tests to avoid the Kinesis costs. To enable this test,
+ * you must have AWS credentials available through the default AWS provider chain,
+ * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
+ */
+ testOrIgnore("basic operation") {
+ ssc = new StreamingContext(sc, Seconds(1))
+ val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+ val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
+ kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+
+ val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+ collected ++= rdd.collect()
+ logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+ }
+ ssc.start()
+
+ val testData = 1 to 10
+ eventually(timeout(120 seconds), interval(10 second)) {
+ kinesisTestUtils.pushData(testData)
+ assert(collected === testData.toSet, "\nData received does not match data sent")
+ }
+ ssc.stop()
+ }
+}