aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala197
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala37
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala17
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala120
4 files changed, 354 insertions, 17 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000000..f6bf552e6b
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer data
+ */
+private class KinesisTestUtils(
+ val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
+ _regionName: String = "") extends Logging {
+
+ val regionName = if (_regionName.length == 0) {
+ RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+ } else {
+ RegionUtils.getRegion(_regionName).getName()
+ }
+
+ val streamShardCount = 2
+
+ private val createStreamTimeoutSeconds = 300
+ private val describeStreamPollTimeSeconds = 1
+
+ @volatile
+ private var streamCreated = false
+ private var _streamName: String = _
+
+ private lazy val kinesisClient = {
+ val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+ client.setEndpoint(endpointUrl)
+ client
+ }
+
+ private lazy val dynamoDB = {
+ val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
+ dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+ new DynamoDB(dynamoDBClient)
+ }
+
+ def streamName: String = {
+ require(streamCreated, "Stream not yet created, call createStream() to create one")
+ _streamName
+ }
+
+ def createStream(): Unit = {
+ logInfo("Creating stream")
+ require(!streamCreated, "Stream already created")
+ _streamName = findNonExistentStreamName()
+
+ // Create a stream. The number of shards determines the provisioned throughput.
+ val createStreamRequest = new CreateStreamRequest()
+ createStreamRequest.setStreamName(_streamName)
+ createStreamRequest.setShardCount(2)
+ kinesisClient.createStream(createStreamRequest)
+
+ // The stream is now being created. Wait for it to become active.
+ waitForStreamToBeActive(_streamName)
+ streamCreated = true
+ logInfo("Created stream")
+ }
+
+ /**
+ * Push data to Kinesis stream and return a map of
+ * shardId -> seq of (data, seq number) pushed to corresponding shard
+ */
+ def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ require(streamCreated, "Stream not yet created, call createStream() to create one")
+ val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+
+ testData.foreach { num =>
+ val str = num.toString
+ val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+ .withData(ByteBuffer.wrap(str.getBytes()))
+ .withPartitionKey(str)
+
+ val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+ val shardId = putRecordResult.getShardId
+ val seqNumber = putRecordResult.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
+
+ logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+ shardIdToSeqNumbers.toMap
+ }
+
+ def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = {
+ try {
+ val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+ val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+ Some(desc)
+ } catch {
+ case rnfe: ResourceNotFoundException =>
+ None
+ }
+ }
+
+ def deleteStream(): Unit = {
+ try {
+ if (describeStream().nonEmpty) {
+ val deleteStreamRequest = new DeleteStreamRequest()
+ kinesisClient.deleteStream(streamName)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Could not delete stream $streamName")
+ }
+ }
+
+ def deleteDynamoDBTable(tableName: String): Unit = {
+ try {
+ val table = dynamoDB.getTable(tableName)
+ table.delete()
+ table.waitForDelete()
+ } catch {
+ case e: Exception =>
+ logWarning(s"Could not delete DynamoDB table $tableName")
+ }
+ }
+
+ private def findNonExistentStreamName(): String = {
+ var testStreamName: String = null
+ do {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+ testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+ } while (describeStream(testStreamName).nonEmpty)
+ testStreamName
+ }
+
+ private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+ val startTime = System.currentTimeMillis()
+ val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+ while (System.currentTimeMillis() < endTime) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+ describeStream(streamNameToWaitFor).foreach { description =>
+ val streamStatus = description.getStreamStatus()
+ logDebug(s"\t- current state: $streamStatus\n")
+ if ("ACTIVE".equals(streamStatus)) {
+ return
+ }
+ }
+ }
+ require(false, s"Stream $streamName never became active")
+ }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+ val envVarName = "RUN_KINESIS_TESTS"
+
+ val shouldRunTests = sys.env.get(envVarName) == Some("1")
+
+ def isAWSCredentialsPresent: Boolean = {
+ Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+ }
+
+ def getAWSCredentials(): AWSCredentials = {
+ assert(shouldRunTests,
+ "Kinesis test not enabled, should not attempt to get AWS credentials")
+ Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+ case Success(cred) => cred
+ case Failure(e) =>
+ throw new Exception("Kinesis tests enabled, but could get not AWS credentials")
+ }
+ }
+}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
new file mode 100644
index 0000000000..6d011f295e
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Kinesis real data transfer tests or
+ * ignores them based on env variable is set or not.
+ */
+trait KinesisSuiteHelper { self: SparkFunSuite =>
+ import KinesisTestUtils._
+
+ /** Run the test if environment variable is set or ignore the test */
+ def testOrIgnore(testName: String)(testBody: => Unit) {
+ if (shouldRunTests) {
+ test(testName)(testBody)
+ } else {
+ ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
+ }
+ }
+}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 2103dca6b7..98f2c7c4f1 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -73,23 +73,6 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
checkpointStateMock, currentClockMock)
}
- test("KinesisUtils API") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- // Tests the API, does not actually test data receiving
- val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
- InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
- InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
- "awsAccessKey", "awsSecretKey")
-
- ssc.stop()
- }
-
test("check serializability of SerializableAWSCredentials") {
Utils.deserialize[SerializableAWSCredentials](
Utils.serialize(new SerializableAWSCredentials("x", "y")))
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
new file mode 100644
index 0000000000..d3dd541fe4
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
+ with Eventually with BeforeAndAfter with BeforeAndAfterAll {
+
+ private val kinesisTestUtils = new KinesisTestUtils()
+
+ // This is the name that KCL uses to save metadata to DynamoDB
+ private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+
+ private var ssc: StreamingContext = _
+ private var sc: SparkContext = _
+
+ override def beforeAll(): Unit = {
+ kinesisTestUtils.createStream()
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll(): Unit = {
+ sc.stop()
+ // Delete the Kinesis stream as well as the DynamoDB table generated by
+ // Kinesis Client Library when consuming the stream
+ kinesisTestUtils.deleteStream()
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+ }
+
+ before {
+ // Delete the DynamoDB table generated by Kinesis Client Library when
+ // consuming from the stream, so that each unit test can start from
+ // scratch without prior history of data consumption
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop(stopSparkContext = false)
+ ssc = null
+ }
+ }
+
+ test("KinesisUtils API") {
+ ssc = new StreamingContext(sc, Seconds(1))
+ // Tests the API, does not actually test data receiving
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+ }
+
+
+ /**
+ * Test the stream by sending data to a Kinesis stream and receiving from it.
+ * This test is not run by default as it requires AWS credentials that the test
+ * environment may not have. Even if there is AWS credentials available, the user
+ * may not want to run these tests to avoid the Kinesis costs. To enable this test,
+ * you must have AWS credentials available through the default AWS provider chain,
+ * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
+ */
+ testOrIgnore("basic operation") {
+ ssc = new StreamingContext(sc, Seconds(1))
+ val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+ val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
+ kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+
+ val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+ collected ++= rdd.collect()
+ logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+ }
+ ssc.start()
+
+ val testData = 1 to 10
+ eventually(timeout(120 seconds), interval(10 second)) {
+ kinesisTestUtils.pushData(testData)
+ assert(collected === testData.toSet, "\nData received does not match data sent")
+ }
+ ssc.stop()
+ }
+}