aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-31 12:19:11 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-31 12:19:11 -0700
commit4a5fe091658b1d06f427e404a11a84fc84f953c5 (patch)
tree42f7a7060d5d4c62302824c7b0f329efc7bce01b /streaming/src/main
parent72f6dbf7b0c8b271f5f9c762374422c69c8ab43d (diff)
downloadspark-4a5fe091658b1d06f427e404a11a84fc84f953c5.tar.gz
spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.tar.bz2
spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.zip
[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwing <zsxwing@gmail.com> Closes #8538 from zsxwing/SPARK-10369.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d532a675d..f86fd44b48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ReceiverTrackingInfo(
streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo))
}
- receiverTrackingInfos -= streamId
+ receiverTrackingInfos(streamId) = newReceiverTrackingInfo
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(true)
// Local messages
case AllReceiverIds =>
- context.reply(receiverTrackingInfos.keys.toSeq)
+ context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()