aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 89af40330b..f2379366f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
- trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
+ trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
@@ -169,13 +169,13 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
- trackerEndpoint.askWithReply[Boolean](msg)
+ trackerEndpoint.askWithRetry[Boolean](msg)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
- trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
+ trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}