diff options
author | Ilayaperumal Gopinathan <igopinathan@pivotal.io> | 2015-01-20 01:41:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-01-20 01:46:58 -0800 |
commit | 228bf6cefb12dc2d8cea4cc86f0591461ecb5c74 (patch) | |
tree | a400df2fd7d20947bf867d00245818517c543eef | |
parent | 6599f50dbe92255ed26fdf4640bec1fca24686db (diff) | |
download | spark-228bf6cefb12dc2d8cea4cc86f0591461ecb5c74.tar.gz spark-228bf6cefb12dc2d8cea4cc86f0591461ecb5c74.tar.bz2 spark-228bf6cefb12dc2d8cea4cc86f0591461ecb5c74.zip |
[SPARK-4803] [streaming] Remove duplicate RegisterReceiver message
- The ReceiverTracker receivers `RegisterReceiver` messages two times
1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`
Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.
Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io>
Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits:
868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs
3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size
634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message
(cherry picked from commit 4afad9c7702239f6d5b1b49dc48ee08580964e17)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 7 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 4 |
2 files changed, 2 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 3b1233e86c..d7229c2b96 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl( /** Akka actor for receiving messages from the ReceiverTracker in the driver */ private val actor = env.actorSystem.actorOf( Props(new Actor { - override def preStart() { - logInfo("Registered receiver " + streamId) - val msg = RegisterReceiver( - streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self) - val future = trackerActor.ask(msg)(askTimeout) - Await.result(future, askTimeout) - } override def receive() = { case StopReceiver => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 84fed95a75..f52562b0a0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { ssc.start() try { - eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverStreamIds.size should be >= 1 + eventually(timeout(2000 millis), interval(20 millis)) { + collector.startedReceiverStreamIds.size should equal (1) collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) |