aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-19 10:38:10 -0700
committerReynold Xin <rxin@databricks.com>2016-04-19 10:38:10 -0700
commit947b9020b0d621bc97661a0a056297e6889936d3 (patch)
treea845b73cf950369e618cb70795c2831bd9298080 /sql/core/src/main/scala/org/apache
parentd9620e769e41541347db863907bdbd057db50823 (diff)
downloadspark-947b9020b0d621bc97661a0a056297e6889936d3.tar.gz
spark-947b9020b0d621bc97661a0a056297e6889936d3.tar.bz2
spark-947b9020b0d621bc97661a0a056297e6889936d3.zip
[SPARK-14676] Wrap and re-throw Await.result exceptions in order to capture full stacktrace
When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread. This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`. I tested this manually using https://github.com/JoshRosen/spark/commit/16b31c825197ee31a50214c6ba3c1df08148f403, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR. /cc rxin nongli yhuai anabranch Author: Josh Rosen <joshrosen@databricks.com> Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala5
2 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 4091f65aec..415cd4d84a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.util.control.NonFatal
import org.apache.spark.{broadcast, SparkEnv}
import org.apache.spark.internal.Logging
@@ -167,7 +168,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def waitForSubqueries(): Unit = {
// fill in the result of subqueries
subqueryResults.foreach { case (e, futureResult) =>
- val rows = Await.result(futureResult, Duration.Inf)
+ val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf)
if (rows.length > 1) {
sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index 102a9356df..a4f4213342 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.exchange
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.spark.broadcast
@@ -81,8 +81,7 @@ case class BroadcastExchange(
}
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
- val result = Await.result(relationFuture, timeout)
- result.asInstanceOf[broadcast.Broadcast[T]]
+ ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
}