aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlayaperumal Gopinathan <igopinathan@pivotal.io>2015-01-20 01:41:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-20 01:46:58 -0800
commit228bf6cefb12dc2d8cea4cc86f0591461ecb5c74 (patch)
treea400df2fd7d20947bf867d00245818517c543eef
parent6599f50dbe92255ed26fdf4640bec1fca24686db (diff)
downloadspark-228bf6cefb12dc2d8cea4cc86f0591461ecb5c74.tar.gz
spark-228bf6cefb12dc2d8cea4cc86f0591461ecb5c74.tar.bz2
spark-228bf6cefb12dc2d8cea4cc86f0591461ecb5c74.zip
[SPARK-4803] [streaming] Remove duplicate RegisterReceiver message
- The ReceiverTracker receivers `RegisterReceiver` messages two times 1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked 2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl` Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the receiver is started. Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io> Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits: 868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs 3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size 634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message (cherry picked from commit 4afad9c7702239f6d5b1b49dc48ee08580964e17) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala4
2 files changed, 2 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 3b1233e86c..d7229c2b96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl(
/** Akka actor for receiving messages from the ReceiverTracker in the driver */
private val actor = env.actorSystem.actorOf(
Props(new Actor {
- override def preStart() {
- logInfo("Registered receiver " + streamId)
- val msg = RegisterReceiver(
- streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
- val future = trackerActor.ask(msg)(askTimeout)
- Await.result(future, askTimeout)
- }
override def receive() = {
case StopReceiver =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 84fed95a75..f52562b0a0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
- eventually(timeout(1000 millis), interval(20 millis)) {
- collector.startedReceiverStreamIds.size should be >= 1
+ eventually(timeout(2000 millis), interval(20 millis)) {
+ collector.startedReceiverStreamIds.size should equal (1)
collector.startedReceiverStreamIds(0) should equal (0)
collector.stoppedReceiverStreamIds should have size 1
collector.stoppedReceiverStreamIds(0) should equal (0)