aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala12
1 files changed, 3 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 2761d39e37..efd26486ab 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
@@ -72,15 +72,9 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
* is still not ready
*/
def awaitResult[T](future: Future[T]): T = {
- val wrapAndRethrow: PartialFunction[Throwable, T] = {
- case NonFatal(t) =>
- throw new SparkException("Exception thrown in awaitResult", t)
- }
try {
- // scalastyle:off awaitresult
- Await.result(future, duration)
- // scalastyle:on awaitresult
- } catch addMessageIfTimeout.orElse(wrapAndRethrow)
+ ThreadUtils.awaitResult(future, duration)
+ } catch addMessageIfTimeout
}
}