aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorAdam Budde <budde@amazon.com>2017-02-22 11:32:36 -0500
committerBurak Yavuz <brkyvz@gmail.com>2017-02-22 11:32:36 -0500
commite4065376d2b4eec178a119476fa95b26f440c076 (patch)
tree83d86c58abbd1c004daad6f30df5e544064491c8 /python/pyspark
parent10c566cc3b5f93ddd823b1c987d9de8286cdea60 (diff)
downloadspark-e4065376d2b4eec178a119476fa95b26f440c076.tar.gz
spark-e4065376d2b4eec178a119476fa95b26f440c076.tar.bz2
spark-e4065376d2b4eec178a119476fa95b26f440c076.zip
[SPARK-19405][STREAMING] Support for cross-account Kinesis reads via STS
- Add dependency on aws-java-sdk-sts - Replace SerializableAWSCredentials with new SerializableCredentialsProvider interface - Make KinesisReceiver take SerializableCredentialsProvider as argument and pass credential provider to KCL - Add new implementations of KinesisUtils.createStream() that take STS arguments - Make JavaKinesisStreamSuite test the entire KinesisUtils Java API - Update KCL/AWS SDK dependencies to 1.7.x/1.11.x ## What changes were proposed in this pull request? [JIRA link with detailed description.](https://issues.apache.org/jira/browse/SPARK-19405) * Replace SerializableAWSCredentials with new SerializableKCLAuthProvider class that takes 5 optional config params for configuring AWS auth and returns the appropriate credential provider object * Add new public createStream() APIs for specifying these parameters in KinesisUtils ## How was this patch tested? * Manually tested using explicit keypair and instance profile to read data from Kinesis stream in separate account (difficult to write a test orchestrating creation and assumption of IAM roles across separate accounts) * Expanded JavaKinesisStreamSuite to test the entire Java API in KinesisUtils ## License acknowledgement This contribution is my original work and that I license the work to the project under the project’s open source license. Author: Budde <budde@amazon.com> Closes #16744 from budde/master.
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/streaming/kinesis.py12
1 files changed, 10 insertions, 2 deletions
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
index 3a8d8b819f..b839859c45 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -37,7 +37,8 @@ class KinesisUtils(object):
def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
initialPositionInStream, checkpointInterval,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
- awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder):
+ awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder,
+ stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None):
"""
Create an input stream that pulls messages from a Kinesis stream. This uses the
Kinesis Client Library (KCL) to pull messages from Kinesis.
@@ -67,6 +68,12 @@ class KinesisUtils(object):
:param awsSecretKey: AWS SecretKey (default is None. If None, will use
DefaultAWSCredentialsProviderChain)
:param decoder: A function used to decode value (default is utf8_decoder)
+ :param stsAssumeRoleArn: ARN of IAM role to assume when using STS sessions to read from
+ the Kinesis stream (default is None).
+ :param stsSessionName: Name to uniquely identify STS sessions used to read from Kinesis
+ stream, if STS is being used (default is None).
+ :param stsExternalId: External ID that can be used to validate against the assumed IAM
+ role's trust policy, if STS is being used (default is None).
:return: A DStream object
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
@@ -81,7 +88,8 @@ class KinesisUtils(object):
raise
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
- awsAccessKeyId, awsSecretKey)
+ awsAccessKeyId, awsSecretKey, stsAssumeRoleArn,
+ stsSessionName, stsExternalId)
stream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))