From 947b9020b0d621bc97661a0a056297e6889936d3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 19 Apr 2016 10:38:10 -0700 Subject: [SPARK-14676] Wrap and re-throw Await.result exceptions in order to capture full stacktrace When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread. This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`. I tested this manually using https://github.com/JoshRosen/spark/commit/16b31c825197ee31a50214c6ba3c1df08148f403, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR. /cc rxin nongli yhuai anabranch Author: Josh Rosen Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions. --- .../main/scala/org/apache/spark/FutureAction.scala | 7 ++++-- .../apache/spark/deploy/FaultToleranceTest.scala | 10 ++++----- .../org/apache/spark/deploy/master/Master.scala | 4 ++-- .../spark/deploy/rest/RestSubmissionClient.scala | 7 +++++- .../spark/network/BlockTransferService.scala | 8 +++---- .../scala/org/apache/spark/rpc/RpcTimeout.scala | 25 ++++++++++++++++------ .../org/apache/spark/storage/BlockManager.scala | 14 ++++++++++-- .../scala/org/apache/spark/util/ThreadUtils.scala | 22 ++++++++++++++++++- .../main/scala/org/apache/spark/util/Utils.scala | 2 ++ 9 files changed, 75 insertions(+), 24 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 339266a5d4..a50600f148 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.JobWaiter +import org.apache.spark.util.ThreadUtils /** @@ -45,6 +46,7 @@ trait FutureAction[T] extends Future[T] { /** * Blocks until this action completes. + * * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf * for unbounded waiting, or a finite positive duration * @return this FutureAction @@ -53,6 +55,7 @@ trait FutureAction[T] extends Future[T] { /** * Awaits and returns the result (of type T) of this action. + * * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf * for unbounded waiting, or a finite positive duration * @throws Exception exception during action execution @@ -89,8 +92,8 @@ trait FutureAction[T] extends Future[T] { /** * Blocks and returns the result of this job. */ - @throws(classOf[Exception]) - def get(): T = Await.result(this, Duration.Inf) + @throws(classOf[SparkException]) + def get(): T = ThreadUtils.awaitResult(this, Duration.Inf) /** * Returns the job IDs run by the underlying async operation. diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index abb98f95a1..79f4d06c84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps @@ -35,7 +35,7 @@ import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.master.RecoveryState import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master. @@ -265,7 +265,7 @@ private object FaultToleranceTest extends App with Logging { } // Avoid waiting indefinitely (e.g., we could register but get no executors). - assertTrue(Await.result(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) } /** @@ -318,7 +318,7 @@ private object FaultToleranceTest extends App with Logging { } try { - assertTrue(Await.result(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) } catch { case e: TimeoutException => logError("Master states: " + masters.map(_.state)) @@ -422,7 +422,7 @@ private object SparkDocker { } dockerCmd.run(ProcessLogger(findIpAndLog _)) - val ip = Await.result(ipPromise.future, 30 seconds) + val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds) val dockerId = Docker.getLastProcessId (ip, dockerId, outFile) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b443e8f051..edc9be2a8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -24,7 +24,7 @@ import java.util.Date import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.language.postfixOps import scala.util.Random @@ -959,7 +959,7 @@ private[deploy] class Master( */ private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = { val futureUI = asyncRebuildSparkUI(app) - Await.result(futureUI, Duration.Inf) + ThreadUtils.awaitResult(futureUI, Duration.Inf) } /** Rebuild a new SparkUI asynchronously to not block RPC event loop */ diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index c5a5876a89..21cb94142b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -27,10 +27,11 @@ import scala.collection.mutable import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.io.Source +import scala.util.control.NonFatal import com.fasterxml.jackson.core.JsonProcessingException -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -258,13 +259,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } } + // scalastyle:off awaitresult try { Await.result(responseFuture, 10.seconds) } catch { + // scalastyle:on awaitresult case unreachable @ (_: FileNotFoundException | _: SocketException) => throw new SubmitRestConnectionException("Unable to connect to server", unreachable) case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => throw new SubmitRestProtocolException("Malformed response received from server", malformed) case timeout: TimeoutException => throw new SubmitRestConnectionException("No response from server", timeout) + case NonFatal(t) => + throw new SparkException("Exception while waiting for response", t) } } diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 09ce012e4e..cb9d389dd7 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -20,7 +20,7 @@ package org.apache.spark.network import java.io.Closeable import java.nio.ByteBuffer -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Future, Promise} import scala.concurrent.duration.Duration import scala.reflect.ClassTag @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.util.ThreadUtils private[spark] abstract class BlockTransferService extends ShuffleClient with Closeable with Logging { @@ -100,8 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo result.success(new NioManagedBuffer(ret)) } }) - - Await.result(result.future, Duration.Inf) + ThreadUtils.awaitResult(result.future, Duration.Inf) } /** @@ -119,6 +119,6 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo level: StorageLevel, classTag: ClassTag[_]): Unit = { val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) - Await.result(future, Duration.Inf) + ThreadUtils.awaitResult(future, Duration.Inf) } } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala index 2950df62bf..2761d39e37 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala @@ -19,10 +19,11 @@ package org.apache.spark.rpc import java.util.concurrent.TimeoutException -import scala.concurrent.{Await, Awaitable} +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import scala.util.control.NonFatal -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.util.Utils /** @@ -65,14 +66,21 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S /** * Wait for the completed result and return it. If the result is not available within this * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout. - * @param awaitable the `Awaitable` to be awaited - * @throws RpcTimeoutException if after waiting for the specified time `awaitable` + * + * @param future the `Future` to be awaited + * @throws RpcTimeoutException if after waiting for the specified time `future` * is still not ready */ - def awaitResult[T](awaitable: Awaitable[T]): T = { + def awaitResult[T](future: Future[T]): T = { + val wrapAndRethrow: PartialFunction[Throwable, T] = { + case NonFatal(t) => + throw new SparkException("Exception thrown in awaitResult", t) + } try { - Await.result(awaitable, duration) - } catch addMessageIfTimeout + // scalastyle:off awaitresult + Await.result(future, duration) + // scalastyle:on awaitresult + } catch addMessageIfTimeout.orElse(wrapAndRethrow) } } @@ -82,6 +90,7 @@ private[spark] object RpcTimeout { /** * Lookup the timeout property in the configuration and create * a RpcTimeout with the property key in the description. + * * @param conf configuration properties containing the timeout * @param timeoutProp property key for the timeout in seconds * @throws NoSuchElementException if property is not set @@ -95,6 +104,7 @@ private[spark] object RpcTimeout { * Lookup the timeout property in the configuration and create * a RpcTimeout with the property key in the description. * Uses the given default value if property is not set + * * @param conf configuration properties containing the timeout * @param timeoutProp property key for the timeout in seconds * @param defaultValue default timeout value in seconds if property not found @@ -109,6 +119,7 @@ private[spark] object RpcTimeout { * and create a RpcTimeout with the first set property key in the * description. * Uses the given default value if property is not set + * * @param conf configuration properties containing the timeout * @param timeoutPropList prioritized list of property keys for the timeout in seconds * @param defaultValue default timeout value in seconds if no properties found diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 35a6c63ad1..22bc76b143 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -260,7 +260,12 @@ private[spark] class BlockManager( def waitForAsyncReregister(): Unit = { val task = asyncReregisterTask if (task != null) { - Await.ready(task, Duration.Inf) + try { + Await.ready(task, Duration.Inf) + } catch { + case NonFatal(t) => + throw new Exception("Error occurred while waiting for async. reregistration", t) + } } } @@ -802,7 +807,12 @@ private[spark] class BlockManager( logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { // Wait for asynchronous replication to finish - Await.ready(replicationFuture, Duration.Inf) + try { + Await.ready(replicationFuture, Duration.Inf) + } catch { + case NonFatal(t) => + throw new Exception("Error occurred while waiting for replication to finish", t) + } } if (blockWasSuccessfullyStored) { None diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 9abbf4a7a3..5a6dbc8304 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -19,12 +19,15 @@ package org.apache.spark.util import java.util.concurrent._ -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.duration.Duration import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.util.control.NonFatal import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} +import org.apache.spark.SparkException + private[spark] object ThreadUtils { private val sameThreadExecutionContext = @@ -174,4 +177,21 @@ private[spark] object ThreadUtils { false // asyncMode ) } + + // scalastyle:off awaitresult + /** + * Preferred alternative to [[Await.result()]]. This method wraps and re-throws any exceptions + * thrown by the underlying [[Await]] call, ensuring that this thread's stack trace appears in + * logs. + */ + @throws(classOf[SparkException]) + def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = { + try { + Await.result(awaitable, atMost) + // scalastyle:on awaitresult + } catch { + case NonFatal(t) => + throw new SparkException("Exception thrown in awaitResult: ", t) + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 78e164cff7..848f7d7adb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1598,6 +1598,7 @@ private[spark] object Utils extends Logging { /** * Timing method based on iterations that permit JVM JIT optimization. + * * @param numIters number of iterations * @param f function to be executed. If prepare is not None, the running time of each call to f * must be an order of magnitude longer than one millisecond for accurate timing. @@ -1639,6 +1640,7 @@ private[spark] object Utils extends Logging { /** * Creates a symlink. + * * @param src absolute path to the source * @param dst relative path for the destination */ -- cgit v1.2.3