diff options
author | Ilayaperumal Gopinathan <igopinathan@pivotal.io> | 2014-12-23 15:14:54 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-12-23 15:15:10 -0800 |
commit | 01adf45a9b2e0264ee4571dd51c04a57126b666a (patch) | |
tree | 89190597e725869fed018561df1829bdb6d87d73 | |
parent | 6a46cc3c83592a8d8e2ae4e6b44c26e39df1e340 (diff) | |
download | spark-01adf45a9b2e0264ee4571dd51c04a57126b666a.tar.gz spark-01adf45a9b2e0264ee4571dd51c04a57126b666a.tar.bz2 spark-01adf45a9b2e0264ee4571dd51c04a57126b666a.zip |
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to
remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`.
Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io>
Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits:
6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review
3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered
(cherry picked from commit 10d69e9cbfdabe95d0e513176d5347d7b59da0ee)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 32e481dabc..1f0e442a12 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logWarning("No prior receiver info") ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) } - receiverInfo(streamId) = newReceiverInfo - listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) + receiverInfo -= streamId + listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { |