diff options
Diffstat (limited to 'external/kinesis-asl/src/test/scala/org/apache')
4 files changed, 216 insertions, 24 deletions
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala new file mode 100644 index 0000000000..1c130654f3 --- /dev/null +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import java.lang.IllegalArgumentException + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase} + +class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterEach + with MockitoSugar { + import KinesisInputDStream._ + + private val ssc = new StreamingContext(conf, batchDuration) + private val streamName = "a-very-nice-kinesis-stream-name" + private val checkpointAppName = "a-very-nice-kcl-app-name" + private def baseBuilder = KinesisInputDStream.builder + private def builder = baseBuilder.streamingContext(ssc) + .streamName(streamName) + .checkpointAppName(checkpointAppName) + + override def afterAll(): Unit = { + ssc.stop() + } + + test("should raise an exception if the StreamingContext is missing") { + intercept[IllegalArgumentException] { + baseBuilder.streamName(streamName).checkpointAppName(checkpointAppName).build() + } + } + + test("should raise an exception if the stream name is missing") { + intercept[IllegalArgumentException] { + baseBuilder.streamingContext(ssc).checkpointAppName(checkpointAppName).build() + } + } + + test("should raise an exception if the checkpoint app name is missing") { + intercept[IllegalArgumentException] { + baseBuilder.streamingContext(ssc).streamName(streamName).build() + } + } + + test("should propagate required values to KinesisInputDStream") { + val dstream = builder.build() + assert(dstream.context == ssc) + assert(dstream.streamName == streamName) + assert(dstream.checkpointAppName == checkpointAppName) + } + + test("should propagate default values to KinesisInputDStream") { + val dstream = builder.build() + assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL) + assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME) + assert(dstream.initialPositionInStream == DEFAULT_INITIAL_POSITION_IN_STREAM) + assert(dstream.checkpointInterval == batchDuration) + assert(dstream._storageLevel == DEFAULT_STORAGE_LEVEL) + assert(dstream.kinesisCreds == DefaultCredentials) + assert(dstream.dynamoDBCreds == None) + assert(dstream.cloudWatchCreds == None) + } + + test("should propagate custom non-auth values to KinesisInputDStream") { + val customEndpointUrl = "https://kinesis.us-west-2.amazonaws.com" + val customRegion = "us-west-2" + val customInitialPosition = InitialPositionInStream.TRIM_HORIZON + val customAppName = "a-very-nice-kinesis-app" + val customCheckpointInterval = Seconds(30) + val customStorageLevel = StorageLevel.MEMORY_ONLY + val customKinesisCreds = mock[SparkAWSCredentials] + val customDynamoDBCreds = mock[SparkAWSCredentials] + val customCloudWatchCreds = mock[SparkAWSCredentials] + + val dstream = builder + .endpointUrl(customEndpointUrl) + .regionName(customRegion) + .initialPositionInStream(customInitialPosition) + .checkpointAppName(customAppName) + .checkpointInterval(customCheckpointInterval) + .storageLevel(customStorageLevel) + .kinesisCredentials(customKinesisCreds) + .dynamoDBCredentials(customDynamoDBCreds) + .cloudWatchCredentials(customCloudWatchCreds) + .build() + assert(dstream.endpointUrl == customEndpointUrl) + assert(dstream.regionName == customRegion) + assert(dstream.initialPositionInStream == customInitialPosition) + assert(dstream.checkpointAppName == customAppName) + assert(dstream.checkpointInterval == customCheckpointInterval) + assert(dstream._storageLevel == customStorageLevel) + assert(dstream.kinesisCreds == customKinesisCreds) + assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) + assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + } +} diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index deb411d73e..3b14c8471e 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -31,7 +31,6 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.mock.MockitoSugar import org.apache.spark.streaming.{Duration, TestSuiteBase} -import org.apache.spark.util.Utils /** * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor @@ -62,28 +61,6 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft checkpointerMock = mock[IRecordProcessorCheckpointer] } - test("check serializability of credential provider classes") { - Utils.deserialize[BasicCredentialsProvider]( - Utils.serialize(BasicCredentialsProvider( - awsAccessKeyId = "x", - awsSecretKey = "y"))) - - Utils.deserialize[STSCredentialsProvider]( - Utils.serialize(STSCredentialsProvider( - stsRoleArn = "fakeArn", - stsSessionName = "fakeSessionName", - stsExternalId = Some("fakeExternalId")))) - - Utils.deserialize[STSCredentialsProvider]( - Utils.serialize(STSCredentialsProvider( - stsRoleArn = "fakeArn", - stsSessionName = "fakeSessionName", - stsExternalId = Some("fakeExternalId"), - longLivedCredsProvider = BasicCredentialsProvider( - awsAccessKeyId = "x", - awsSecretKey = "y")))) - } - test("process records including store and set checkpointer") { when(receiverMock.isStopped()).thenReturn(false) when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index afb55c84f8..ed7e358050 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -138,7 +138,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun assert(kinesisRDD.regionName === dummyRegionName) assert(kinesisRDD.endpointUrl === dummyEndpointUrl) assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) - assert(kinesisRDD.kinesisCredsProvider === BasicCredentialsProvider( + assert(kinesisRDD.kinesisCreds === BasicCredentials( awsAccessKeyId = dummyAWSAccessKey, awsSecretKey = dummyAWSSecretKey)) assert(nonEmptyRDD.partitions.size === blockInfos.size) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala new file mode 100644 index 0000000000..f579c2c3a6 --- /dev/null +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentialsBuilderSuite.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.TestSuiteBase +import org.apache.spark.util.Utils + +class SparkAWSCredentialsBuilderSuite extends TestSuiteBase { + private def builder = SparkAWSCredentials.builder + + private val basicCreds = BasicCredentials( + awsAccessKeyId = "a-very-nice-access-key", + awsSecretKey = "a-very-nice-secret-key") + + private val stsCreds = STSCredentials( + stsRoleArn = "a-very-nice-role-arn", + stsSessionName = "a-very-nice-secret-key", + stsExternalId = Option("a-very-nice-external-id"), + longLivedCreds = basicCreds) + + test("should build DefaultCredentials when given no params") { + assert(builder.build() == DefaultCredentials) + } + + test("should build BasicCredentials") { + assertResult(basicCreds) { + builder.basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey) + .build() + } + } + + test("should build STSCredentials") { + // No external ID, default long-lived creds + assertResult(stsCreds.copy(stsExternalId = None, longLivedCreds = DefaultCredentials)) { + builder.stsCredentials(stsCreds.stsRoleArn, stsCreds.stsSessionName) + .build() + } + // Default long-lived creds + assertResult(stsCreds.copy(longLivedCreds = DefaultCredentials)) { + builder.stsCredentials( + stsCreds.stsRoleArn, + stsCreds.stsSessionName, + stsCreds.stsExternalId.get) + .build() + } + // No external ID, basic keypair for long-lived creds + assertResult(stsCreds.copy(stsExternalId = None)) { + builder.stsCredentials(stsCreds.stsRoleArn, stsCreds.stsSessionName) + .basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey) + .build() + } + // Basic keypair for long-lived creds + assertResult(stsCreds) { + builder.stsCredentials( + stsCreds.stsRoleArn, + stsCreds.stsSessionName, + stsCreds.stsExternalId.get) + .basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey) + .build() + } + // Order shouldn't matter + assertResult(stsCreds) { + builder.basicCredentials(basicCreds.awsAccessKeyId, basicCreds.awsSecretKey) + .stsCredentials( + stsCreds.stsRoleArn, + stsCreds.stsSessionName, + stsCreds.stsExternalId.get) + .build() + } + } + + test("SparkAWSCredentials classes should be serializable") { + assertResult(basicCreds) { + Utils.deserialize[BasicCredentials](Utils.serialize(basicCreds)) + } + assertResult(stsCreds) { + Utils.deserialize[STSCredentials](Utils.serialize(stsCreds)) + } + // Will also test if DefaultCredentials can be serialized + val stsDefaultCreds = stsCreds.copy(longLivedCreds = DefaultCredentials) + assertResult(stsDefaultCreds) { + Utils.deserialize[STSCredentials](Utils.serialize(stsDefaultCreds)) + } + } +} |