From 947b9020b0d621bc97661a0a056297e6889936d3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 19 Apr 2016 10:38:10 -0700 Subject: [SPARK-14676] Wrap and re-throw Await.result exceptions in order to capture full stacktrace When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread. This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`. I tested this manually using https://github.com/JoshRosen/spark/commit/16b31c825197ee31a50214c6ba3c1df08148f403, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR. /cc rxin nongli yhuai anabranch Author: Josh Rosen Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions. --- .../sql/catalyst/expressions/CodeGenerationSuite.scala | 3 ++- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 3 ++- .../spark/sql/execution/exchange/BroadcastExchange.scala | 5 ++--- .../org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 6 +++--- .../sql/hive/thriftserver/HiveThriftServer2Suites.scala | 15 ++++++++------- 5 files changed, 17 insertions(+), 15 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 260dfb3f42..94e676ded6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.ThreadUtils /** * Additional tests for code generation. @@ -43,7 +44,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } } - futures.foreach(Await.result(_, 10.seconds)) + futures.foreach(ThreadUtils.awaitResult(_, 10.seconds)) } test("SPARK-8443: split wide projections into blocks due to JVM code size limit") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 4091f65aec..415cd4d84a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkEnv} import org.apache.spark.internal.Logging @@ -167,7 +168,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def waitForSubqueries(): Unit = { // fill in the result of subqueries subqueryResults.foreach { case (e, futureResult) => - val rows = Await.result(futureResult, Duration.Inf) + val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf) if (rows.length > 1) { sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala index 102a9356df..a4f4213342 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.exchange -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import org.apache.spark.broadcast @@ -81,8 +81,7 @@ case class BroadcastExchange( } override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - val result = Await.result(relationFuture, timeout) - result.asInstanceOf[broadcast.Broadcast[T]] + ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index eb49eabcb1..0d0f556d9e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import java.util.Date import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, Promise} +import scala.concurrent.Promise import scala.concurrent.duration._ import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -32,7 +32,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary @@ -132,7 +132,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() try { - Await.result(foundAllExpectedAnswers.future, timeout) + ThreadUtils.awaitResult(foundAllExpectedAnswers.future, timeout) } catch { case cause: Throwable => val message = s""" diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index a1268b8e94..ee14b6dc8d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -24,7 +24,7 @@ import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.io.Source import scala.util.{Random, Try} @@ -40,7 +40,7 @@ import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer @@ -373,9 +373,10 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // slightly more conservatively than may be strictly necessary. Thread.sleep(1000) statement.cancel() - val e = intercept[SQLException] { - Await.result(f, 3.minute) - } + val e = intercept[SparkException] { + ThreadUtils.awaitResult(f, 3.minute) + }.getCause + assert(e.isInstanceOf[SQLException]) assert(e.getMessage.contains("cancelled")) // Cancellation is a no-op if spark.sql.hive.thriftServer.async=false @@ -391,7 +392,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // might race and complete before we issue the cancel. Thread.sleep(1000) statement.cancel() - val rs1 = Await.result(sf, 3.minute) + val rs1 = ThreadUtils.awaitResult(sf, 3.minute) rs1.next() assert(rs1.getInt(1) === math.pow(5, 5)) rs1.close() @@ -814,7 +815,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl process } - Await.result(serverStarted.future, SERVER_STARTUP_TIMEOUT) + ThreadUtils.awaitResult(serverStarted.future, SERVER_STARTUP_TIMEOUT) } private def stopThriftServer(): Unit = { -- cgit v1.2.3