aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala4
2 files changed, 2 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 3b1233e86c..d7229c2b96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl(
/** Akka actor for receiving messages from the ReceiverTracker in the driver */
private val actor = env.actorSystem.actorOf(
Props(new Actor {
- override def preStart() {
- logInfo("Registered receiver " + streamId)
- val msg = RegisterReceiver(
- streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
- val future = trackerActor.ask(msg)(askTimeout)
- Await.result(future, askTimeout)
- }
override def receive() = {
case StopReceiver =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 84fed95a75..f52562b0a0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
- eventually(timeout(1000 millis), interval(20 millis)) {
- collector.startedReceiverStreamIds.size should be >= 1
+ eventually(timeout(2000 millis), interval(20 millis)) {
+ collector.startedReceiverStreamIds.size should equal (1)
collector.startedReceiverStreamIds(0) should equal (0)
collector.stoppedReceiverStreamIds should have size 1
collector.stoppedReceiverStreamIds(0) should equal (0)