diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-22 23:05:54 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-22 23:05:54 -0700 |
commit | baa89838cca96fa091c9e5ce62be01e1a265d820 (patch) | |
tree | 69108bd5764df15e0047d5d0b1d27ca4c03579ab /extras/kinesis-asl/src | |
parent | 017b3404a50bd4b04ed73c5a69acb7b19a929822 (diff) | |
download | spark-baa89838cca96fa091c9e5ce62be01e1a265d820.tar.gz spark-baa89838cca96fa091c9e5ce62be01e1a265d820.tar.bz2 spark-baa89838cca96fa091c9e5ce62be01e1a265d820.zip |
[SPARK-7838] [STREAMING] Set scope for kinesis stream
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6369 from tdas/SPARK-7838 and squashes the following commits:
87d1c7f [Tathagata Das] Addressed comment
37775d8 [Tathagata Das] set scope for kinesis stream
Diffstat (limited to 'extras/kinesis-asl/src')
-rw-r--r-- | extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index b114bcff92..2531aebe78 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -63,9 +63,12 @@ object KinesisUtils { checkpointInterval: Duration, storageLevel: StorageLevel ): ReceiverInputDStream[Array[Byte]] = { - ssc.receiverStream( - new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, checkpointInterval, storageLevel, None)) + // Setting scope to override receiver stream's scope of "receiver stream" + ssc.withNamedScope("kinesis stream") { + ssc.receiverStream( + new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), + initialPositionInStream, checkpointInterval, storageLevel, None)) + } } /** |