aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala38
1 files changed, 35 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 994e18676e..a5778876d4 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -63,8 +63,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
/**
- * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
- * timeout, or throw a SparkException if this fails even after the default number of retries.
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
+ * default timeout, throw an exception if this fails.
+ *
+ * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
+ * loop of [[RpcEndpoint]].
+
+ * @param message the message to send
+ * @tparam T type of the reply message
+ * @return the reply message from the corresponding [[RpcEndpoint]]
+ */
+ def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
+ * specified timeout, throw an exception if this fails.
+ *
+ * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
+ * loop of [[RpcEndpoint]].
+ *
+ * @param message the message to send
+ * @param timeout the timeout duration
+ * @tparam T type of the reply message
+ * @return the reply message from the corresponding [[RpcEndpoint]]
+ */
+ def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
+ val future = ask[T](message, timeout)
+ timeout.awaitResult(future)
+ }
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
+ * default timeout, throw a SparkException if this fails even after the default number of retries.
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
* method retries, the message handling in the receiver side should be idempotent.
*
@@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
+ @deprecated("use 'askSync' instead.", "2.2.0")
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
/**
- * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* specified timeout, throw a SparkException if this fails even after the specified number of
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
* retries, the message handling in the receiver side should be idempotent.
@@ -91,6 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
+ @deprecated("use 'askSync' instead.", "2.2.0")
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0