diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8f55d982a9..bd7ab0b9bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false trackerState = Stopping if (!skipReceiverLaunch) { // Send the stop signal to all the receivers - endpoint.askWithRetry[Boolean](StopAllReceivers) + endpoint.askSync[Boolean](StopAllReceivers) // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. @@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } // Check if all the receivers have been deregistered or not - val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds) + val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds) if (receivers.nonEmpty) { logWarning("Not all of the receivers have deregistered, " + receivers) } else { @@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false */ def allocatedExecutors(): Map[Int, Option[String]] = synchronized { if (isTrackerStarted) { - endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { _.runningExecutor.map { _.executorId } |