aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala20
1 files changed, 15 insertions, 5 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 320728f4bb..1026d0fcb5 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -70,9 +70,14 @@ import org.apache.spark.util.Utils
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects
- * @param kinesisCredsProvider SerializableCredentialsProvider instance that will be used to
- * generate the AWSCredentialsProvider instance used for KCL
- * authorization.
+ * @param kinesisCreds SparkAWSCredentials instance that will be used to generate the
+ * AWSCredentialsProvider passed to the KCL to authorize Kinesis API calls.
+ * @param cloudWatchCreds Optional SparkAWSCredentials instance that will be used to generate the
+ * AWSCredentialsProvider passed to the KCL to authorize CloudWatch API
+ * calls. Will use kinesisCreds if value is None.
+ * @param dynamoDBCreds Optional SparkAWSCredentials instance that will be used to generate the
+ * AWSCredentialsProvider passed to the KCL to authorize DynamoDB API calls.
+ * Will use kinesisCreds if value is None.
*/
private[kinesis] class KinesisReceiver[T](
val streamName: String,
@@ -83,7 +88,9 @@ private[kinesis] class KinesisReceiver[T](
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
- kinesisCredsProvider: SerializableCredentialsProvider)
+ kinesisCreds: SparkAWSCredentials,
+ dynamoDBCreds: Option[SparkAWSCredentials],
+ cloudWatchCreds: Option[SparkAWSCredentials])
extends Receiver[T](storageLevel) with Logging { receiver =>
/*
@@ -140,10 +147,13 @@ private[kinesis] class KinesisReceiver[T](
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
+ val kinesisProvider = kinesisCreds.provider
val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
- kinesisCredsProvider.provider,
+ kinesisProvider,
+ dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
+ cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)