aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorIlayaperumal Gopinathan <igopinathan@pivotal.io>2014-12-23 15:14:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-23 15:15:10 -0800
commit01adf45a9b2e0264ee4571dd51c04a57126b666a (patch)
tree89190597e725869fed018561df1829bdb6d87d73 /streaming
parent6a46cc3c83592a8d8e2ae4e6b44c26e39df1e340 (diff)
downloadspark-01adf45a9b2e0264ee4571dd51c04a57126b666a.tar.gz
spark-01adf45a9b2e0264ee4571dd51c04a57126b666a.tar.bz2
spark-01adf45a9b2e0264ee4571dd51c04a57126b666a.zip
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9cbfdabe95d0e513176d5347d7b59da0ee) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 32e481dabc..1f0e442a12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
- receiverInfo(streamId) = newReceiverInfo
- listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
+ receiverInfo -= streamId
+ listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {