aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
2 files changed, 7 insertions, 4 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index b114bcff92..2531aebe78 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -63,9 +63,12 @@ object KinesisUtils {
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
- ssc.receiverStream(
- new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, checkpointInterval, storageLevel, None))
+ // Setting scope to override receiver stream's scope of "receiver stream"
+ ssc.withNamedScope("kinesis stream") {
+ ssc.receiverStream(
+ new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, checkpointInterval, storageLevel, None))
+ }
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 7b77d447ce..5e58ed7148 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -262,7 +262,7 @@ class StreamingContext private[streaming] (
*
* Note: Return statements are NOT allowed in the given body.
*/
- private def withNamedScope[U](name: String)(body: => U): U = {
+ private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
}