diff options
author | Adam Budde <budde@amazon.com> | 2017-02-22 11:32:36 -0500 |
---|---|---|
committer | Burak Yavuz <brkyvz@gmail.com> | 2017-02-22 11:32:36 -0500 |
commit | e4065376d2b4eec178a119476fa95b26f440c076 (patch) | |
tree | 83d86c58abbd1c004daad6f30df5e544064491c8 /python/pyspark | |
parent | 10c566cc3b5f93ddd823b1c987d9de8286cdea60 (diff) | |
download | spark-e4065376d2b4eec178a119476fa95b26f440c076.tar.gz spark-e4065376d2b4eec178a119476fa95b26f440c076.tar.bz2 spark-e4065376d2b4eec178a119476fa95b26f440c076.zip |
[SPARK-19405][STREAMING] Support for cross-account Kinesis reads via STS
- Add dependency on aws-java-sdk-sts
- Replace SerializableAWSCredentials with new SerializableCredentialsProvider interface
- Make KinesisReceiver take SerializableCredentialsProvider as argument and
pass credential provider to KCL
- Add new implementations of KinesisUtils.createStream() that take STS
arguments
- Make JavaKinesisStreamSuite test the entire KinesisUtils Java API
- Update KCL/AWS SDK dependencies to 1.7.x/1.11.x
## What changes were proposed in this pull request?
[JIRA link with detailed description.](https://issues.apache.org/jira/browse/SPARK-19405)
* Replace SerializableAWSCredentials with new SerializableKCLAuthProvider class that takes 5 optional config params for configuring AWS auth and returns the appropriate credential provider object
* Add new public createStream() APIs for specifying these parameters in KinesisUtils
## How was this patch tested?
* Manually tested using explicit keypair and instance profile to read data from Kinesis stream in separate account (difficult to write a test orchestrating creation and assumption of IAM roles across separate accounts)
* Expanded JavaKinesisStreamSuite to test the entire Java API in KinesisUtils
## License acknowledgement
This contribution is my original work and that I license the work to the project under the project’s open source license.
Author: Budde <budde@amazon.com>
Closes #16744 from budde/master.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/streaming/kinesis.py | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index 3a8d8b819f..b839859c45 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -37,7 +37,8 @@ class KinesisUtils(object): def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, checkpointInterval, storageLevel=StorageLevel.MEMORY_AND_DISK_2, - awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder): + awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder, + stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None): """ Create an input stream that pulls messages from a Kinesis stream. This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. @@ -67,6 +68,12 @@ class KinesisUtils(object): :param awsSecretKey: AWS SecretKey (default is None. If None, will use DefaultAWSCredentialsProviderChain) :param decoder: A function used to decode value (default is utf8_decoder) + :param stsAssumeRoleArn: ARN of IAM role to assume when using STS sessions to read from + the Kinesis stream (default is None). + :param stsSessionName: Name to uniquely identify STS sessions used to read from Kinesis + stream, if STS is being used (default is None). + :param stsExternalId: External ID that can be used to validate against the assumed IAM + role's trust policy, if STS is being used (default is None). :return: A DStream object """ jlevel = ssc._sc._getJavaStorageLevel(storageLevel) @@ -81,7 +88,8 @@ class KinesisUtils(object): raise jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, jduration, jlevel, - awsAccessKeyId, awsSecretKey) + awsAccessKeyId, awsSecretKey, stsAssumeRoleArn, + stsSessionName, stsExternalId) stream = DStream(jstream, ssc, NoOpSerializer()) return stream.map(lambda v: decoder(v)) |