diff options
author | Ilayaperumal Gopinathan <igopinathan@pivotal.io> | 2015-01-20 01:41:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-01-20 01:41:10 -0800 |
commit | 4afad9c7702239f6d5b1b49dc48ee08580964e17 (patch) | |
tree | 2542333741008c30732b55a0f0503785f617237b /streaming/src | |
parent | debc03195302789b022e329ce6ade5d7fe225a3c (diff) | |
download | spark-4afad9c7702239f6d5b1b49dc48ee08580964e17.tar.gz spark-4afad9c7702239f6d5b1b49dc48ee08580964e17.tar.bz2 spark-4afad9c7702239f6d5b1b49dc48ee08580964e17.zip |
[SPARK-4803] [streaming] Remove duplicate RegisterReceiver message
- The ReceiverTracker receivers `RegisterReceiver` messages two times
1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`
Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.
Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io>
Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits:
868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs
3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size
634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 7 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 4 |
2 files changed, 2 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 3b1233e86c..d7229c2b96 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl( /** Akka actor for receiving messages from the ReceiverTracker in the driver */ private val actor = env.actorSystem.actorOf( Props(new Actor { - override def preStart() { - logInfo("Registered receiver " + streamId) - val msg = RegisterReceiver( - streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self) - val future = trackerActor.ask(msg)(askTimeout) - Await.result(future, askTimeout) - } override def receive() = { case StopReceiver => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 84fed95a75..f52562b0a0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { ssc.start() try { - eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverStreamIds.size should be >= 1 + eventually(timeout(2000 millis), interval(20 millis)) { + collector.startedReceiverStreamIds.size should equal (1) collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) |