aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala10
1 files changed, 10 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b9d898a723..8f55d982a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
+ } else if (isTrackerInitialized) {
+ trackerState = Stopping
+ // `ReceivedBlockTracker` is open when this instance is created. We should
+ // close this even if this `ReceiverTracker` is not started.
+ receivedBlockTracker.stop()
+ logInfo("ReceiverTracker stopped")
+ trackerState = Stopped
}
}
@@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
endpoint.send(StartAllReceivers(receivers))
}
+ /** Check if tracker has been marked for initiated */
+ private def isTrackerInitialized: Boolean = trackerState == Initialized
+
/** Check if tracker has been marked for starting */
private def isTrackerStarted: Boolean = trackerState == Started