aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src/test/scala/org/apache')
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala115
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala23
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala2
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala100
4 files changed, 216 insertions, 24 deletions
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
new file mode 100644
index 0000000000..1c130654f3
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.lang.IllegalArgumentException
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase}
+
+class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterEach
+ with MockitoSugar {
+ import KinesisInputDStream._
+
+ private val ssc = new StreamingContext(conf, batchDuration)
+ private val streamName = "a-very-nice-kinesis-stream-name"
+ private val checkpointAppName = "a-very-nice-kcl-app-name"
+ private def baseBuilder = KinesisInputDStream.builder
+ private def builder = baseBuilder.streamingContext(ssc)
+ .streamName(streamName)
+ .checkpointAppName(checkpointAppName)
+
+ override def afterAll(): Unit = {
+ ssc.stop()
+ }
+
+ test("should raise an exception if the StreamingContext is missing") {
+ intercept[IllegalArgumentException] {
+ baseBuilder.streamName(streamName).checkpointAppName(checkpointAppName).build()
+ }
+ }
+
+ test("should raise an exception if the stream name is missing") {
+ intercept[IllegalArgumentException] {
+ baseBuilder.streamingContext(ssc).checkpointAppName(checkpointAppName).build()
+ }
+ }
+
+ test("should raise an exception if the checkpoint app name is missing") {
+ intercept[IllegalArgumentException] {
+ baseBuilder.streamingContext(ssc).streamName(streamName).build()
+ }
+ }
+
+ test("should propagate required values to KinesisInputDStream") {
+ val dstream = builder.build()
+ assert(dstream.context == ssc)
+ assert(dstream.streamName == streamName)
+ assert(dstream.checkpointAppName == checkpointAppName)
+ }
+
+ test("should propagate default values to KinesisInputDStream") {
+ val dstream = builder.build()
+ assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL)
+ assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME)
+ assert(dstream.initialPositionInStream == DEFAULT_INITIAL_POSITION_IN_STREAM)
+ assert(dstream.checkpointInterval == batchDuration)
+ assert(dstream._storageLevel == DEFAULT_STORAGE_LEVEL)
+ assert(dstream.kinesisCreds == DefaultCredentials)
+ assert(dstream.dynamoDBCreds == None)
+ assert(dstream.cloudWatchCreds == None)
+ }
+
+ test("should propagate custom non-auth values to KinesisInputDStream") {
+ val customEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
+ val customRegion = "us-west-2"
+ val customInitialPosition = InitialPositionInStream.TRIM_HORIZON
+ val customAppName = "a-very-nice-kinesis-app"
+ val customCheckpointInterval = Seconds(30)
+ val customStorageLevel = StorageLevel.MEMORY_ONLY
+ val customKinesisCreds = mock[SparkAWSCredentials]
+ val customDynamoDBCreds = mock[SparkAWSCredentials]
+ val customCloudWatchCreds = mock[SparkAWSCredentials]
+
+ val dstream = builder
+ .endpointUrl(customEndpointUrl)
+ .regionName(customRegion)
+ .initialPositionInStream(customInitialPosition)
+ .checkpointAppName(customAppName)
+ .checkpointInterval(customCheckpointInterval)
+ .storageLevel(customStorageLevel)
+ .kinesisCredentials(customKinesisCreds)
+ .dynamoDBCredentials(customDynamoDBCreds)
+ .cloudWatchCredentials(customCloudWatchCreds)
+ .build()
+ assert(dstream.endpointUrl == customEndpointUrl)
+ assert(dstream.regionName == customRegion)
+ assert(dstream.initialPositionInStream == customInitialPosition)
+ assert(dstream.checkpointAppName == customAppName)
+ assert(dstream.checkpointInterval == customCheckpointInterval)
+ assert(dstream._storageLevel == customStorageLevel)
+ assert(dstream.kinesisCreds == customKinesisCreds)
+ assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
+ assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+ }
+}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index deb411d73e..3b14c8471e 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -31,7 +31,6 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mock.MockitoSugar
import org.apache.spark.streaming.{Duration, TestSuiteBase}
-import org.apache.spark.util.Utils
/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -62,28 +61,6 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
checkpointerMock = mock[IRecordProcessorCheckpointer]
}
- test("check serializability of credential provider classes") {
- Utils.deserialize[BasicCredentialsProvider](
- Utils.serialize(BasicCredentialsProvider(
- awsAccessKeyId = "x",
- awsSecretKey = "y")))
-
- Utils.deserialize[STSCredentialsProvider](
- Utils.serialize(STSCredentialsProvider(
- stsRoleArn = "fakeArn",
- stsSessionName = "fakeSessionName",
- stsExternalId = Some("fakeExternalId"))))
-
- Utils.deserialize[STSCredentialsProvider](
- Utils.serialize(STSCredentialsProvider(
- stsRoleArn = "fakeArn",
- stsSessionName = "fakeSessionName",
- stsExternalId = Some("fakeExternalId"),
- longLivedCredsProvider = BasicCredentialsProvider(
- awsAccessKeyId = "x",
- awsSecretKey = "y"))))
- }
-
test("process records including store and set checkpointer") {
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index afb55c84f8..ed7e358050 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -138,7 +138,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
assert(kinesisRDD.regionName === dummyRegionName)
assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
- assert(kinesisRDD.kinesisCredsProvider === BasicCredentialsProvider(
+ assert(kinesisRDD.kinesisCreds === BasicCredentials(
awsAccessKeyId = dummyAWSAccessKey,
awsSecretKey = dummyAWSSecretKey))
assert(nonEmptyRDD.partitions.size === blockInfos.size)
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala
new file mode 100644
index 0000000000..f579c2c3a6
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.TestSuiteBase
+import org.apache.spark.util.Utils
+
+class SparkAWSCredentialsBuilderSuite extends TestSuiteBase {
+ private def builder = SparkAWSCredentials.builder
+
+ private val basicCreds = BasicCredentials(
+ awsAccessKeyId = "a-very-nice-access-key",
+ awsSecretKey = "a-very-nice-secret-key")
+
+ private val stsCreds = STSCredentials(
+ stsRoleArn = "a-very-nice-role-arn",
+ stsSessionName = "a-very-nice-secret-key",
+ stsExternalId = Option("a-very-nice-external-id"),
+ longLivedCreds = basicCreds)
+
+ test("should build DefaultCredentials when given no params") {
+ assert(builder.build() == DefaultCredentials)
+ }
+
+ test("should build BasicCredentials") {
+ assertResult(basicCreds) {
+ builder.basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey)
+ .build()
+ }
+ }
+
+ test("should build STSCredentials") {
+ // No external ID, default long-lived creds
+ assertResult(stsCreds.copy(stsExternalId = None, longLivedCreds = DefaultCredentials)) {
+ builder.stsCredentials(stsCreds.stsRoleArn, stsCreds.stsSessionName)
+ .build()
+ }
+ // Default long-lived creds
+ assertResult(stsCreds.copy(longLivedCreds = DefaultCredentials)) {
+ builder.stsCredentials(
+ stsCreds.stsRoleArn,
+ stsCreds.stsSessionName,
+ stsCreds.stsExternalId.get)
+ .build()
+ }
+ // No external ID, basic keypair for long-lived creds
+ assertResult(stsCreds.copy(stsExternalId = None)) {
+ builder.stsCredentials(stsCreds.stsRoleArn, stsCreds.stsSessionName)
+ .basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey)
+ .build()
+ }
+ // Basic keypair for long-lived creds
+ assertResult(stsCreds) {
+ builder.stsCredentials(
+ stsCreds.stsRoleArn,
+ stsCreds.stsSessionName,
+ stsCreds.stsExternalId.get)
+ .basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey)
+ .build()
+ }
+ // Order shouldn't matter
+ assertResult(stsCreds) {
+ builder.basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey)
+ .stsCredentials(
+ stsCreds.stsRoleArn,
+ stsCreds.stsSessionName,
+ stsCreds.stsExternalId.get)
+ .build()
+ }
+ }
+
+ test("SparkAWSCredentials classes should be serializable") {
+ assertResult(basicCreds) {
+ Utils.deserialize[BasicCredentials](Utils.serialize(basicCreds))
+ }
+ assertResult(stsCreds) {
+ Utils.deserialize[STSCredentials](Utils.serialize(stsCreds))
+ }
+ // Will also test if DefaultCredentials can be serialized
+ val stsDefaultCreds = stsCreds.copy(longLivedCreds = DefaultCredentials)
+ assertResult(stsDefaultCreds) {
+ Utils.deserialize[STSCredentials](Utils.serialize(stsDefaultCreds))
+ }
+ }
+}