From 687273d9150e1c89a74aa1473f0c6495f56509af Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 29 Apr 2015 09:46:37 -0700 Subject: [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask. The old naming scheme was very confusing between askWithReply and sendWithReply. I also divided RpcEnv.scala into multiple files. Author: Reynold Xin Closes #5768 from rxin/rpc-rename and squashes the following commits: a84058e [Reynold Xin] [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask. --- .../apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 89af40330b..f2379366f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl( logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) - trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo)) + trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } @@ -169,13 +169,13 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onReceiverStart() { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint) - trackerEndpoint.askWithReply[Boolean](msg) + trackerEndpoint.askWithRetry[Boolean](msg) } override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") - trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString)) + trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString)) logInfo("Stopped receiver " + streamId) } -- cgit v1.2.3