aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorIlayaperumal Gopinathan <igopinathan@pivotal.io>2014-12-23 15:14:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-23 15:14:54 -0800
commit10d69e9cbfdabe95d0e513176d5347d7b59da0ee (patch)
treee1e0b4d4115a0b14e579a4fb0beba64c0a7041c2 /streaming
parent96281cd0c3ffb4c0fbdb64b8f7b705f863d46b33 (diff)
downloadspark-10d69e9cbfdabe95d0e513176d5347d7b59da0ee.tar.gz
spark-10d69e9cbfdabe95d0e513176d5347d7b59da0ee.tar.bz2
spark-10d69e9cbfdabe95d0e513176d5347d7b59da0ee.zip
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 32e481dabc..1f0e442a12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
- receiverInfo(streamId) = newReceiverInfo
- listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
+ receiverInfo -= streamId
+ listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {