diff options
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 3 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala | 5 |
2 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 4091f65aec..415cd4d84a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkEnv} import org.apache.spark.internal.Logging @@ -167,7 +168,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def waitForSubqueries(): Unit = { // fill in the result of subqueries subqueryResults.foreach { case (e, futureResult) => - val rows = Await.result(futureResult, Duration.Inf) + val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf) if (rows.length > 1) { sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala index 102a9356df..a4f4213342 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.exchange -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import org.apache.spark.broadcast @@ -81,8 +81,7 @@ case class BroadcastExchange( } override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - val result = Await.result(relationFuture, timeout) - result.asInstanceOf[broadcast.Broadcast[T]] + ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] } } |