diff options
author | Reynold Xin <rxin@databricks.com> | 2015-04-29 09:46:37 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-29 09:46:37 -0700 |
commit | 687273d9150e1c89a74aa1473f0c6495f56509af (patch) | |
tree | 5f58e6c27ca6b8feddeb2c22d5558fd7dc180257 /streaming/src | |
parent | baed3f2c73afd9c7d9de34f0485c507ac6a498b0 (diff) | |
download | spark-687273d9150e1c89a74aa1473f0c6495f56509af.tar.gz spark-687273d9150e1c89a74aa1473f0c6495f56509af.tar.bz2 spark-687273d9150e1c89a74aa1473f0c6495f56509af.zip |
[SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.
The old naming scheme was very confusing between askWithReply and sendWithReply. I also divided RpcEnv.scala into multiple files.
Author: Reynold Xin <rxin@databricks.com>
Closes #5768 from rxin/rpc-rename and squashes the following commits:
a84058e [Reynold Xin] [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 89af40330b..f2379366f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl( logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) - trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo)) + trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } @@ -169,13 +169,13 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onReceiverStart() { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint) - trackerEndpoint.askWithReply[Boolean](msg) + trackerEndpoint.askWithRetry[Boolean](msg) } override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") - trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString)) + trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString)) logInfo("Stopped receiver " + streamId) } |