diff options
author | Ilayaperumal Gopinathan <igopinathan@pivotal.io> | 2014-12-23 15:14:54 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-12-23 15:14:54 -0800 |
commit | 10d69e9cbfdabe95d0e513176d5347d7b59da0ee (patch) | |
tree | e1e0b4d4115a0b14e579a4fb0beba64c0a7041c2 /streaming/src | |
parent | 96281cd0c3ffb4c0fbdb64b8f7b705f863d46b33 (diff) | |
download | spark-10d69e9cbfdabe95d0e513176d5347d7b59da0ee.tar.gz spark-10d69e9cbfdabe95d0e513176d5347d7b59da0ee.tar.bz2 spark-10d69e9cbfdabe95d0e513176d5347d7b59da0ee.zip |
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to
remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`.
Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io>
Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits:
6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review
3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 32e481dabc..1f0e442a12 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logWarning("No prior receiver info") ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) } - receiverInfo(streamId) = newReceiverInfo - listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) + receiverInfo -= streamId + listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { |