aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 8f55d982a9..bd7ab0b9bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
trackerState = Stopping
if (!skipReceiverLaunch) {
// Send the stop signal to all the receivers
- endpoint.askWithRetry[Boolean](StopAllReceivers)
+ endpoint.askSync[Boolean](StopAllReceivers)
// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
@@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
// Check if all the receivers have been deregistered or not
- val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
+ val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds)
if (receivers.nonEmpty) {
logWarning("Not all of the receivers have deregistered, " + receivers)
} else {
@@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
*/
def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
if (isTrackerStarted) {
- endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+ endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
_.runningExecutor.map {
_.executorId
}