aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-16 13:45:55 -0500
committerReynold Xin <rxin@databricks.com>2015-04-16 13:45:55 -0500
commitef3fb801ae971656ed9cd1b0ab95bc5a1548adbd (patch)
treee5d559cc39ec9c1f30bf67625513d2a9579660d6 /core
parent3ae37b93a7c299bd8b22a36248035bca5de3422f (diff)
downloadspark-ef3fb801ae971656ed9cd1b0ab95bc5a1548adbd.tar.gz
spark-ef3fb801ae971656ed9cd1b0ab95bc5a1548adbd.tar.bz2
spark-ef3fb801ae971656ed9cd1b0ab95bc5a1548adbd.zip
[SPARK-6934][Core] Use 'spark.akka.askTimeout' for the ask timeout
Fixed my mistake in #4588 Author: zsxwing <zsxwing@gmail.com> Closes #5529 from zsxwing/SPARK-6934 and squashes the following commits: 9890b2d [zsxwing] Use 'spark.akka.askTimeout' for the ask timeout
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index e259867c14..f2c1c86af7 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -284,7 +284,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3)
private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000)
- private[this] val defaultTimeout = conf.getLong("spark.akka.lookupTimeout", 30) seconds
+ private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds
/**
* return the address for the [[RpcEndpointRef]]
@@ -304,7 +304,8 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
*
* This method only sends the message once and never retries.
*/
- def sendWithReply[T: ClassTag](message: Any): Future[T] = sendWithReply(message, defaultTimeout)
+ def sendWithReply[T: ClassTag](message: Any): Future[T] =
+ sendWithReply(message, defaultAskTimeout)
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
@@ -327,7 +328,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
- def askWithReply[T: ClassTag](message: Any): T = askWithReply(message, defaultTimeout)
+ def askWithReply[T: ClassTag](message: Any): T = askWithReply(message, defaultAskTimeout)
/**
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a