aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-07-02 21:38:21 -0500
committerImran Rashid <irashid@cloudera.com>2015-07-02 21:38:21 -0500
commitaa7bbc143844020e4711b3aa4ce75c1b7733a80d (patch)
tree44de121d82137380362145bc0c05288551698a76
parentd9838196ff48faeac19756852a7f695129c08047 (diff)
downloadspark-aa7bbc143844020e4711b3aa4ce75c1b7733a80d.tar.gz
spark-aa7bbc143844020e4711b3aa4ce75c1b7733a80d.tar.bz2
spark-aa7bbc143844020e4711b3aa4ce75c1b7733a80d.zip
[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer)
Latest changes after refactoring to the RPC layer. I rebased against trunk to make sure to get any recent changes since it had been a while. I wasn't crazy about the name `ConfigureTimeout` and `RpcTimeout` seemed to fit better, but I'm open to suggestions! I ran most of the tests and they pass, but others would get stuck with "WARN TaskSchedulerImpl: Initial job has not accepted any resources". I think its just my machine, so I'd though I would push what I have anyway. Still left to do: * I only added a couple unit tests so far, there are probably some more cases to test * Make sure all uses require a `RpcTimeout` * Right now, both the `ask` and `Await.result` use the same timeout, should we differentiate between these in the TimeoutException message? * I wrapped `Await.result` in `RpcTimeout`, should we also wrap `Await.ready`? * Proper scoping of classes and methods hardmettle, feel free to help out with any of these! Author: Bryan Cutler <bjcutler@us.ibm.com> Author: Harsh Gupta <harsh@Harshs-MacBook-Pro.local> Author: BryanCutler <cutlerb@gmail.com> Closes #6205 from BryanCutler/configTimeout-6980 and squashes the following commits: 46c8d48 [Bryan Cutler] [SPARK-6980] Changed RpcEnvSuite test to never reply instead of just sleeping, to avoid possible sync issues 06afa53 [Bryan Cutler] [SPARK-6980] RpcTimeout class extends Serializable, was causing error in MasterSuite 7bb70f1 [Bryan Cutler] Merge branch 'master' into configTimeout-6980 dbd5f73 [Bryan Cutler] [SPARK-6980] Changed RpcUtils askRpcTimeout and lookupRpcTimeout scope to private[spark] and improved deprecation warning msg 4e89c75 [Bryan Cutler] [SPARK-6980] Missed one usage of deprecated RpcUtils.askTimeout in YarnSchedulerBackend although it is not being used, and fixed SparkConfSuite UT to not use deprecated RpcUtils functions 6a1c50d [Bryan Cutler] [SPARK-6980] Minor cleanup of test case 7f4d78e [Bryan Cutler] [SPARK-6980] Fixed scala style checks 287059a [Bryan Cutler] [SPARK-6980] Removed extra import in AkkaRpcEnvSuite 3d8b1ff [Bryan Cutler] [SPARK-6980] Cleaned up imports in AkkaRpcEnvSuite 3a168c7 [Bryan Cutler] [SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite 7636189 [Bryan Cutler] [SPARK-6980] Fixed call to askWithReply in DAGScheduler to use RpcTimeout - this was being compiled by auto-tupling and changing the message type of BlockManagerHeartbeat be11c4e [Bryan Cutler] Merge branch 'master' into configTimeout-6980 039afed [Bryan Cutler] [SPARK-6980] Corrected import organization 218aa50 [Bryan Cutler] [SPARK-6980] Corrected issues from feedback fadaf6f [Bryan Cutler] [SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTimout to fix MiMa errors fa6ed82 [Bryan Cutler] [SPARK-6980] Had to increase timeout on positive test case because a processor slowdown could trigger an Future TimeoutException b05d449 [Bryan Cutler] [SPARK-6980] Changed constructor to use val duration instead of getter function, changed name of string property from conf to timeoutProp for consistency c6cfd33 [Bryan Cutler] [SPARK-6980] Changed UT ask message timeout to explicitly intercept a SparkException 1394de6 [Bryan Cutler] [SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly 1517721 [Bryan Cutler] [SPARK-6980] RpcTimeout object scope should be private[spark] 2206b4d [Bryan Cutler] [SPARK-6980] Added unit test for ask then immediat awaitReply 1b9beab [Bryan Cutler] [SPARK-6980] Cleaned up import ordering 08f5afc [Bryan Cutler] [SPARK-6980] Added UT for constructing RpcTimeout with default value d3754d1 [Bryan Cutler] [SPARK-6980] Added akkaConf to prevent dead letter logging 995d196 [Bryan Cutler] [SPARK-6980] Cleaned up import ordering, comments, spacing from PR feedback 7774d56 [Bryan Cutler] [SPARK-6980] Cleaned up UT imports 4351c48 [Bryan Cutler] [SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs 1607a5f [Bryan Cutler] [SPARK-6980] Changed addMessageIfTimeout to PartialFunction, cleanup from PR comments 2f94095 [Bryan Cutler] [SPARK-6980] Added addMessageIfTimeout for when a Future is completed with TimeoutException 235919b [Bryan Cutler] [SPARK-6980] Resolved conflicts after master merge c07d05c [Bryan Cutler] Merge branch 'master' into configTimeout-6980-tmp b7fb99f [BryanCutler] Merge pull request #2 from hardmettle/configTimeoutUpdates_6980 4be3a8d [Harsh Gupta] Modifying loop condition to find property match 0ee5642 [Harsh Gupta] Changing the loop condition to halt at the first match in the property list for RpcEnv exception catch f74064d [Harsh Gupta] Retrieving properties from property list using iterator and while loop instead of chained functions a294569 [Bryan Cutler] [SPARK-6980] Added creation of RpcTimeout with Seq of property keys 23d2f26 [Bryan Cutler] [SPARK-6980] Fixed await result not being handled by RpcTimeout 49f9f04 [Bryan Cutler] [SPARK-6980] Minor cleanup and scala style fix 5b59a44 [Bryan Cutler] [SPARK-6980] Added some RpcTimeout unit tests 78a2c0a [Bryan Cutler] [SPARK-6980] Using RpcTimeout.awaitResult for future in AppClient now 97523e0 [Bryan Cutler] [SPARK-6980] Akka ask timeout description refactored to RPC layer
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala112
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/util/RpcUtils.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala97
11 files changed, 258 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index b3bb5f911d..334a5b1014 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {
- private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
+ private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
initialize()
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 69181edb9a..6ae4789459 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -17,8 +17,7 @@
package org.apache.spark.rpc
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.Future
import scala.reflect.ClassTag
import org.apache.spark.util.RpcUtils
@@ -32,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
- private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
+ private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
/**
* return the address for the [[RpcEndpointRef]]
@@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
*
* This method only sends the message once and never retries.
*/
- def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
+ def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
@@ -91,7 +90,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
- def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
+ def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
@@ -99,7 +98,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
attempts += 1
try {
val future = ask[T](message, timeout)
- val result = Await.result(future, timeout)
+ val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
}
@@ -110,10 +109,14 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}
- Thread.sleep(retryWaitMs)
+
+ if (attempts < maxRetries) {
+ Thread.sleep(retryWaitMs)
+ }
}
throw new SparkException(
s"Error sending message [message = $message]", lastException)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 3b6938ec63..1709bdf560 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -18,8 +18,10 @@
package org.apache.spark.rpc
import java.net.URI
+import java.util.concurrent.TimeoutException
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Awaitable, Await, Future}
+import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.{SecurityManager, SparkConf}
@@ -66,7 +68,7 @@ private[spark] object RpcEnv {
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {
- private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
+ private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
@@ -94,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
- Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
+ defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}
/**
@@ -184,3 +186,107 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}
+
+
+/**
+ * An exception thrown if RpcTimeout modifies a [[TimeoutException]].
+ */
+private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
+ extends TimeoutException(message) { initCause(cause) }
+
+
+/**
+ * Associates a timeout with a description so that a when a TimeoutException occurs, additional
+ * context about the timeout can be amended to the exception message.
+ * @param duration timeout duration in seconds
+ * @param timeoutProp the configuration property that controls this timeout
+ */
+private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
+ extends Serializable {
+
+ /** Amends the standard message of TimeoutException to include the description */
+ private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
+ new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + timeoutProp, te)
+ }
+
+ /**
+ * PartialFunction to match a TimeoutException and add the timeout description to the message
+ *
+ * @note This can be used in the recover callback of a Future to add to a TimeoutException
+ * Example:
+ * val timeout = new RpcTimeout(5 millis, "short timeout")
+ * Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
+ */
+ def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
+ // The exception has already been converted to a RpcTimeoutException so just raise it
+ case rte: RpcTimeoutException => throw rte
+ // Any other TimeoutException get converted to a RpcTimeoutException with modified message
+ case te: TimeoutException => throw createRpcTimeoutException(te)
+ }
+
+ /**
+ * Wait for the completed result and return it. If the result is not available within this
+ * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
+ * @param awaitable the `Awaitable` to be awaited
+ * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
+ * is still not ready
+ */
+ def awaitResult[T](awaitable: Awaitable[T]): T = {
+ try {
+ Await.result(awaitable, duration)
+ } catch addMessageIfTimeout
+ }
+}
+
+
+private[spark] object RpcTimeout {
+
+ /**
+ * Lookup the timeout property in the configuration and create
+ * a RpcTimeout with the property key in the description.
+ * @param conf configuration properties containing the timeout
+ * @param timeoutProp property key for the timeout in seconds
+ * @throws NoSuchElementException if property is not set
+ */
+ def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
+ val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
+ new RpcTimeout(timeout, timeoutProp)
+ }
+
+ /**
+ * Lookup the timeout property in the configuration and create
+ * a RpcTimeout with the property key in the description.
+ * Uses the given default value if property is not set
+ * @param conf configuration properties containing the timeout
+ * @param timeoutProp property key for the timeout in seconds
+ * @param defaultValue default timeout value in seconds if property not found
+ */
+ def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
+ val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
+ new RpcTimeout(timeout, timeoutProp)
+ }
+
+ /**
+ * Lookup prioritized list of timeout properties in the configuration
+ * and create a RpcTimeout with the first set property key in the
+ * description.
+ * Uses the given default value if property is not set
+ * @param conf configuration properties containing the timeout
+ * @param timeoutPropList prioritized list of property keys for the timeout in seconds
+ * @param defaultValue default timeout value in seconds if no properties found
+ */
+ def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
+ require(timeoutPropList.nonEmpty)
+
+ // Find the first set property or use the default value with the first property
+ val itr = timeoutPropList.iterator
+ var foundProp: Option[(String, String)] = None
+ while (itr.hasNext && foundProp.isEmpty){
+ val propKey = itr.next()
+ conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
+ }
+ val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
+ val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
+ new RpcTimeout(timeout, finalProp._1)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index 31ebe5ac5b..f2d87f6834 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
-import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -214,8 +213,11 @@ private[spark] class AkkaRpcEnv private[akka] (
override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
import actorSystem.dispatcher
- actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
- map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
+ actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration).
+ map(new AkkaRpcEndpointRef(defaultAddress, _, conf)).
+ // this is just in case there is a timeout from creating the future in resolveOne, we want the
+ // exception to indicate the conf that determines the timeout
+ recover(defaultLookupTimeout.addMessageIfTimeout)
}
override def uriOf(systemName: String, address: RpcAddress, endpointName: String): String = {
@@ -295,8 +297,8 @@ private[akka] class AkkaRpcEndpointRef(
actorRef ! AkkaMessage(message, false)
}
- override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
- actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
+ override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
+ actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
// The function will run in the calling thread, so it should be short and never block.
case msg @ AkkaMessage(message, reply) =>
if (reply) {
@@ -307,7 +309,8 @@ private[akka] class AkkaRpcEndpointRef(
}
case AkkaFailure(e) =>
Future.failed(e)
- }(ThreadUtils.sameThread).mapTo[T]
+ }(ThreadUtils.sameThread).mapTo[T].
+ recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}
override def toString: String = s"${getClass.getSimpleName}($actorRef)"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a7cf0c23d9..6841fa8357 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
@@ -188,7 +189,7 @@ class DAGScheduler(
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
- BlockManagerHeartbeat(blockManagerId), 600 seconds)
+ BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
// Called by TaskScheduler when an executor fails.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 190ff61d68..bc67abb5df 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
- private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
+ private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
/**
* Request executors from the ApplicationMaster by specifying the total number desired.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 7cdae22b0e..f70f701494 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -33,7 +33,7 @@ class BlockManagerMaster(
isDriver: Boolean)
extends Logging {
- val timeout = RpcUtils.askTimeout(conf)
+ val timeout = RpcUtils.askRpcTimeout(conf)
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
@@ -106,7 +106,7 @@ class BlockManagerMaster(
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
- Await.result(future, timeout)
+ timeout.awaitResult(future)
}
}
@@ -118,7 +118,7 @@ class BlockManagerMaster(
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
- Await.result(future, timeout)
+ timeout.awaitResult(future)
}
}
@@ -132,7 +132,7 @@ class BlockManagerMaster(
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
- Await.result(future, timeout)
+ timeout.awaitResult(future)
}
}
@@ -176,8 +176,8 @@ class BlockManagerMaster(
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
Option[BlockStatus],
Iterable[Option[BlockStatus]]]]
- val blockStatus = Await.result(
- Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
+ val blockStatus = timeout.awaitResult(
+ Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
@@ -199,7 +199,7 @@ class BlockManagerMaster(
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
- Await.result(future, timeout)
+ timeout.awaitResult(future)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 96aa2fe164..c179833e5b 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -18,8 +18,6 @@
package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
-import scala.concurrent.Await
-import scala.concurrent.duration.FiniteDuration
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -28,6 +26,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
+import org.apache.spark.rpc.RpcTimeout
/**
* Various utility classes for working with Akka.
@@ -147,7 +146,7 @@ private[spark] object AkkaUtils extends Logging {
def askWithReply[T](
message: Any,
actor: ActorRef,
- timeout: FiniteDuration): T = {
+ timeout: RpcTimeout): T = {
askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
}
@@ -160,7 +159,7 @@ private[spark] object AkkaUtils extends Logging {
actor: ActorRef,
maxAttempts: Int,
retryInterval: Long,
- timeout: FiniteDuration): T = {
+ timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
if (actor == null) {
throw new SparkException(s"Error sending message [message = $message]" +
@@ -171,8 +170,8 @@ private[spark] object AkkaUtils extends Logging {
while (attempts < maxAttempts) {
attempts += 1
try {
- val future = actor.ask(message)(timeout)
- val result = Await.result(future, timeout)
+ val future = actor.ask(message)(timeout.duration)
+ val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
}
@@ -198,9 +197,9 @@ private[spark] object AkkaUtils extends Logging {
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
- val timeout = RpcUtils.lookupTimeout(conf)
+ val timeout = RpcUtils.lookupRpcTimeout(conf)
logInfo(s"Connecting to $name: $url")
- Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+ timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
}
def makeExecutorRef(
@@ -212,9 +211,9 @@ private[spark] object AkkaUtils extends Logging {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
- val timeout = RpcUtils.lookupTimeout(conf)
+ val timeout = RpcUtils.lookupRpcTimeout(conf)
logInfo(s"Connecting to $name: $url")
- Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+ timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
}
def protocol(actorSystem: ActorSystem): String = {
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index f16cc8e7e4..7578a3b1d8 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -17,11 +17,11 @@
package org.apache.spark.util
-import scala.concurrent.duration._
+import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
import org.apache.spark.{SparkEnv, SparkConf}
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
object RpcUtils {
@@ -47,14 +47,22 @@ object RpcUtils {
}
/** Returns the default Spark timeout to use for RPC ask operations. */
+ private[spark] def askRpcTimeout(conf: SparkConf): RpcTimeout = {
+ RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
+ }
+
+ @deprecated("use askRpcTimeout instead, this method was not intended to be public", "1.5.0")
def askTimeout(conf: SparkConf): FiniteDuration = {
- conf.getTimeAsSeconds("spark.rpc.askTimeout",
- conf.get("spark.network.timeout", "120s")) seconds
+ askRpcTimeout(conf).duration
}
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
+ private[spark] def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
+ RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
+ }
+
+ @deprecated("use lookupRpcTimeout instead, this method was not intended to be public", "1.5.0")
def lookupTimeout(conf: SparkConf): FiniteDuration = {
- conf.getTimeAsSeconds("spark.rpc.lookupTimeout",
- conf.get("spark.network.timeout", "120s")) seconds
+ lookupRpcTimeout(conf).duration
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 9fbaeb33f9..90cb7da94e 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -260,10 +260,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(RpcUtils.retryWaitMs(conf) === 2L)
conf.set("spark.akka.askTimeout", "3")
- assert(RpcUtils.askTimeout(conf) === (3 seconds))
+ assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
conf.set("spark.akka.lookupTimeout", "4")
- assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
+ assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
}
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 1f0aa759b0..6ceafe4337 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,16 +155,21 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val conf = new SparkConf()
+ val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try {
- val e = intercept[Exception] {
- rpcEndpointRef.askWithRetry[String]("hello", 1 millis)
+ // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause
+ val e = intercept[SparkException] {
+ rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp))
}
- assert(e.isInstanceOf[TimeoutException] || e.getCause.isInstanceOf[TimeoutException])
+ // The SparkException cause should be a RpcTimeoutException with message indicating the
+ // controlling timeout property
+ assert(e.getCause.isInstanceOf[RpcTimeoutException])
+ assert(e.getCause.getMessage.contains(shortProp))
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
@@ -539,6 +544,92 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
+ test("construct RpcTimeout with conf property") {
+ val conf = new SparkConf
+
+ val testProp = "spark.ask.test.timeout"
+ val testDurationSeconds = 30
+ val secondaryProp = "spark.ask.secondary.timeout"
+
+ conf.set(testProp, s"${testDurationSeconds}s")
+ conf.set(secondaryProp, "100s")
+
+ // Construct RpcTimeout with a single property
+ val rt1 = RpcTimeout(conf, testProp)
+ assert( testDurationSeconds === rt1.duration.toSeconds )
+
+ // Construct RpcTimeout with prioritized list of properties
+ val rt2 = RpcTimeout(conf, Seq("spark.ask.invalid.timeout", testProp, secondaryProp), "1s")
+ assert( testDurationSeconds === rt2.duration.toSeconds )
+
+ // Construct RpcTimeout with default value,
+ val defaultProp = "spark.ask.default.timeout"
+ val defaultDurationSeconds = 1
+ val rt3 = RpcTimeout(conf, Seq(defaultProp), defaultDurationSeconds.toString + "s")
+ assert( defaultDurationSeconds === rt3.duration.toSeconds )
+ assert( rt3.timeoutProp.contains(defaultProp) )
+
+ // Try to construct RpcTimeout with an unconfigured property
+ intercept[NoSuchElementException] {
+ RpcTimeout(conf, "spark.ask.invalid.timeout")
+ }
+ }
+
+ test("ask a message timeout on Future using RpcTimeout") {
+ case class NeverReply(msg: String)
+
+ val rpcEndpointRef = env.setupEndpoint("ask-future", new RpcEndpoint {
+ override val rpcEnv = env
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case msg: String => context.reply(msg)
+ case _: NeverReply =>
+ }
+ })
+
+ val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
+ val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
+
+ // Ask with immediate response, should complete successfully
+ val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
+ val reply1 = longTimeout.awaitResult(fut1)
+ assert("hello" === reply1)
+
+ // Ask with a delayed response and wait for response immediately that should timeout
+ val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout)
+ val reply2 =
+ intercept[RpcTimeoutException] {
+ shortTimeout.awaitResult(fut2)
+ }.getMessage
+
+ // RpcTimeout.awaitResult should have added the property to the TimeoutException message
+ assert(reply2.contains(shortTimeout.timeoutProp))
+
+ // Ask with delayed response and allow the Future to timeout before Await.result
+ val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
+
+ // Allow future to complete with failure using plain Await.result, this will return
+ // once the future is complete to verify addMessageIfTimeout was invoked
+ val reply3 =
+ intercept[RpcTimeoutException] {
+ Await.result(fut3, 200 millis)
+ }.getMessage
+
+ // When the future timed out, the recover callback should have used
+ // RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
+ assert(reply3.contains(shortTimeout.timeoutProp))
+
+ // Use RpcTimeout.awaitResult to process Future, since it has already failed with
+ // RpcTimeoutException, the same RpcTimeoutException should be thrown
+ val reply4 =
+ intercept[RpcTimeoutException] {
+ shortTimeout.awaitResult(fut3)
+ }.getMessage
+
+ // Ensure description is not in message twice after addMessageIfTimeout and awaitResult
+ assert(shortTimeout.timeoutProp.r.findAllIn(reply4).length === 1)
+ }
+
}
class UnserializableClass