From 228bf6cefb12dc2d8cea4cc86f0591461ecb5c74 Mon Sep 17 00:00:00 2001 From: Ilayaperumal Gopinathan Date: Tue, 20 Jan 2015 01:41:10 -0800 Subject: [SPARK-4803] [streaming] Remove duplicate RegisterReceiver message - The ReceiverTracker receivers `RegisterReceiver` messages two times 1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked 2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl` Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the receiver is started. Author: Ilayaperumal Gopinathan Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits: 868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs 3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size 634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message (cherry picked from commit 4afad9c7702239f6d5b1b49dc48ee08580964e17) Signed-off-by: Tathagata Das --- .../apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 7 ------- .../scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 4 ++-- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 3b1233e86c..d7229c2b96 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl( /** Akka actor for receiving messages from the ReceiverTracker in the driver */ private val actor = env.actorSystem.actorOf( Props(new Actor { - override def preStart() { - logInfo("Registered receiver " + streamId) - val msg = RegisterReceiver( - streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self) - val future = trackerActor.ask(msg)(askTimeout) - Await.result(future, askTimeout) - } override def receive() = { case StopReceiver => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 84fed95a75..f52562b0a0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { ssc.start() try { - eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverStreamIds.size should be >= 1 + eventually(timeout(2000 millis), interval(20 millis)) { + collector.startedReceiverStreamIds.size should equal (1) collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) -- cgit v1.2.3