aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala5
2 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 4091f65aec..415cd4d84a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.util.control.NonFatal
import org.apache.spark.{broadcast, SparkEnv}
import org.apache.spark.internal.Logging
@@ -167,7 +168,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def waitForSubqueries(): Unit = {
// fill in the result of subqueries
subqueryResults.foreach { case (e, futureResult) =>
- val rows = Await.result(futureResult, Duration.Inf)
+ val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf)
if (rows.length > 1) {
sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index 102a9356df..a4f4213342 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.exchange
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.spark.broadcast
@@ -81,8 +81,7 @@ case class BroadcastExchange(
}
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
- val result = Await.result(relationFuture, timeout)
- result.asInstanceOf[broadcast.Broadcast[T]]
+ ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
}