diff options
Diffstat (limited to 'external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala')
-rw-r--r-- | external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 320728f4bb..1026d0fcb5 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -70,9 +70,14 @@ import org.apache.spark.util.Utils * See the Kinesis Spark Streaming documentation for more * details on the different types of checkpoints. * @param storageLevel Storage level to use for storing the received objects - * @param kinesisCredsProvider SerializableCredentialsProvider instance that will be used to - * generate the AWSCredentialsProvider instance used for KCL - * authorization. + * @param kinesisCreds SparkAWSCredentials instance that will be used to generate the + * AWSCredentialsProvider passed to the KCL to authorize Kinesis API calls. + * @param cloudWatchCreds Optional SparkAWSCredentials instance that will be used to generate the + * AWSCredentialsProvider passed to the KCL to authorize CloudWatch API + * calls. Will use kinesisCreds if value is None. + * @param dynamoDBCreds Optional SparkAWSCredentials instance that will be used to generate the + * AWSCredentialsProvider passed to the KCL to authorize DynamoDB API calls. + * Will use kinesisCreds if value is None. */ private[kinesis] class KinesisReceiver[T]( val streamName: String, @@ -83,7 +88,9 @@ private[kinesis] class KinesisReceiver[T]( checkpointInterval: Duration, storageLevel: StorageLevel, messageHandler: Record => T, - kinesisCredsProvider: SerializableCredentialsProvider) + kinesisCreds: SparkAWSCredentials, + dynamoDBCreds: Option[SparkAWSCredentials], + cloudWatchCreds: Option[SparkAWSCredentials]) extends Receiver[T](storageLevel) with Logging { receiver => /* @@ -140,10 +147,13 @@ private[kinesis] class KinesisReceiver[T]( workerId = Utils.localHostName() + ":" + UUID.randomUUID() kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) + val kinesisProvider = kinesisCreds.provider val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( checkpointAppName, streamName, - kinesisCredsProvider.provider, + kinesisProvider, + dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), + cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), workerId) .withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(initialPositionInStream) |