aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-29 09:46:37 -0700
committerReynold Xin <rxin@databricks.com>2015-04-29 09:46:37 -0700
commit687273d9150e1c89a74aa1473f0c6495f56509af (patch)
tree5f58e6c27ca6b8feddeb2c22d5558fd7dc180257 /streaming
parentbaed3f2c73afd9c7d9de34f0485c507ac6a498b0 (diff)
downloadspark-687273d9150e1c89a74aa1473f0c6495f56509af.tar.gz
spark-687273d9150e1c89a74aa1473f0c6495f56509af.tar.bz2
spark-687273d9150e1c89a74aa1473f0c6495f56509af.zip
[SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.
The old naming scheme was very confusing between askWithReply and sendWithReply. I also divided RpcEnv.scala into multiple files. Author: Reynold Xin <rxin@databricks.com> Closes #5768 from rxin/rpc-rename and squashes the following commits: a84058e [Reynold Xin] [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 89af40330b..f2379366f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
- trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
+ trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
@@ -169,13 +169,13 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
- trackerEndpoint.askWithReply[Boolean](msg)
+ trackerEndpoint.askWithRetry[Boolean](msg)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
- trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
+ trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}