aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-19 10:38:10 -0700
committerReynold Xin <rxin@databricks.com>2016-04-19 10:38:10 -0700
commit947b9020b0d621bc97661a0a056297e6889936d3 (patch)
treea845b73cf950369e618cb70795c2831bd9298080 /core/src/main/scala/org/apache
parentd9620e769e41541347db863907bdbd057db50823 (diff)
downloadspark-947b9020b0d621bc97661a0a056297e6889936d3.tar.gz
spark-947b9020b0d621bc97661a0a056297e6889936d3.tar.bz2
spark-947b9020b0d621bc97661a0a056297e6889936d3.zip
[SPARK-14676] Wrap and re-throw Await.result exceptions in order to capture full stacktrace
When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread. This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`. I tested this manually using https://github.com/JoshRosen/spark/commit/16b31c825197ee31a50214c6ba3c1df08148f403, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR. /cc rxin nongli yhuai anabranch Author: Josh Rosen <joshrosen@databricks.com> Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockTransferService.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadUtils.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
9 files changed, 75 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 339266a5d4..a50600f148 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.JobWaiter
+import org.apache.spark.util.ThreadUtils
/**
@@ -45,6 +46,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Blocks until this action completes.
+ *
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @return this FutureAction
@@ -53,6 +55,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Awaits and returns the result (of type T) of this action.
+ *
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @throws Exception exception during action execution
@@ -89,8 +92,8 @@ trait FutureAction[T] extends Future[T] {
/**
* Blocks and returns the result of this job.
*/
- @throws(classOf[Exception])
- def get(): T = Await.result(this, Duration.Inf)
+ @throws(classOf[SparkException])
+ def get(): T = ThreadUtils.awaitResult(this, Duration.Inf)
/**
* Returns the job IDs run by the underlying async operation.
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index abb98f95a1..79f4d06c84 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeoutException
import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -35,7 +35,7 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
@@ -265,7 +265,7 @@ private object FaultToleranceTest extends App with Logging {
}
// Avoid waiting indefinitely (e.g., we could register but get no executors).
- assertTrue(Await.result(f, 120 seconds))
+ assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
}
/**
@@ -318,7 +318,7 @@ private object FaultToleranceTest extends App with Logging {
}
try {
- assertTrue(Await.result(f, 120 seconds))
+ assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
@@ -422,7 +422,7 @@ private object SparkDocker {
}
dockerCmd.run(ProcessLogger(findIpAndLog _))
- val ip = Await.result(ipPromise.future, 30 seconds)
+ val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b443e8f051..edc9be2a8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -24,7 +24,7 @@ import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random
@@ -959,7 +959,7 @@ private[deploy] class Master(
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val futureUI = asyncRebuildSparkUI(app)
- Await.result(futureUI, Duration.Inf)
+ ThreadUtils.awaitResult(futureUI, Duration.Inf)
}
/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index c5a5876a89..21cb94142b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -27,10 +27,11 @@ import scala.collection.mutable
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.io.Source
+import scala.util.control.NonFatal
import com.fasterxml.jackson.core.JsonProcessingException
-import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -258,13 +259,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
}
}
+ // scalastyle:off awaitresult
try { Await.result(responseFuture, 10.seconds) } catch {
+ // scalastyle:on awaitresult
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
case timeout: TimeoutException =>
throw new SubmitRestConnectionException("No response from server", timeout)
+ case NonFatal(t) =>
+ throw new SparkException("Exception while waiting for response", t)
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 09ce012e4e..cb9d389dd7 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -20,7 +20,7 @@ package org.apache.spark.network
import java.io.Closeable
import java.nio.ByteBuffer
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.util.ThreadUtils
private[spark]
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
@@ -100,8 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
result.success(new NioManagedBuffer(ret))
}
})
-
- Await.result(result.future, Duration.Inf)
+ ThreadUtils.awaitResult(result.future, Duration.Inf)
}
/**
@@ -119,6 +119,6 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
- Await.result(future, Duration.Inf)
+ ThreadUtils.awaitResult(future, Duration.Inf)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 2950df62bf..2761d39e37 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -19,10 +19,11 @@ package org.apache.spark.rpc
import java.util.concurrent.TimeoutException
-import scala.concurrent.{Await, Awaitable}
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
+import scala.util.control.NonFatal
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.util.Utils
/**
@@ -65,14 +66,21 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
- * @param awaitable the `Awaitable` to be awaited
- * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
+ *
+ * @param future the `Future` to be awaited
+ * @throws RpcTimeoutException if after waiting for the specified time `future`
* is still not ready
*/
- def awaitResult[T](awaitable: Awaitable[T]): T = {
+ def awaitResult[T](future: Future[T]): T = {
+ val wrapAndRethrow: PartialFunction[Throwable, T] = {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult", t)
+ }
try {
- Await.result(awaitable, duration)
- } catch addMessageIfTimeout
+ // scalastyle:off awaitresult
+ Await.result(future, duration)
+ // scalastyle:on awaitresult
+ } catch addMessageIfTimeout.orElse(wrapAndRethrow)
}
}
@@ -82,6 +90,7 @@ private[spark] object RpcTimeout {
/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
+ *
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @throws NoSuchElementException if property is not set
@@ -95,6 +104,7 @@ private[spark] object RpcTimeout {
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* Uses the given default value if property is not set
+ *
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @param defaultValue default timeout value in seconds if property not found
@@ -109,6 +119,7 @@ private[spark] object RpcTimeout {
* and create a RpcTimeout with the first set property key in the
* description.
* Uses the given default value if property is not set
+ *
* @param conf configuration properties containing the timeout
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 35a6c63ad1..22bc76b143 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -260,7 +260,12 @@ private[spark] class BlockManager(
def waitForAsyncReregister(): Unit = {
val task = asyncReregisterTask
if (task != null) {
- Await.ready(task, Duration.Inf)
+ try {
+ Await.ready(task, Duration.Inf)
+ } catch {
+ case NonFatal(t) =>
+ throw new Exception("Error occurred while waiting for async. reregistration", t)
+ }
}
}
@@ -802,7 +807,12 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
// Wait for asynchronous replication to finish
- Await.ready(replicationFuture, Duration.Inf)
+ try {
+ Await.ready(replicationFuture, Duration.Inf)
+ } catch {
+ case NonFatal(t) =>
+ throw new Exception("Error occurred while waiting for replication to finish", t)
+ }
}
if (blockWasSuccessfullyStored) {
None
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 9abbf4a7a3..5a6dbc8304 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -19,12 +19,15 @@ package org.apache.spark.util
import java.util.concurrent._
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+import org.apache.spark.SparkException
+
private[spark] object ThreadUtils {
private val sameThreadExecutionContext =
@@ -174,4 +177,21 @@ private[spark] object ThreadUtils {
false // asyncMode
)
}
+
+ // scalastyle:off awaitresult
+ /**
+ * Preferred alternative to [[Await.result()]]. This method wraps and re-throws any exceptions
+ * thrown by the underlying [[Await]] call, ensuring that this thread's stack trace appears in
+ * logs.
+ */
+ @throws(classOf[SparkException])
+ def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ Await.result(awaitable, atMost)
+ // scalastyle:on awaitresult
+ } catch {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult: ", t)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 78e164cff7..848f7d7adb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1598,6 +1598,7 @@ private[spark] object Utils extends Logging {
/**
* Timing method based on iterations that permit JVM JIT optimization.
+ *
* @param numIters number of iterations
* @param f function to be executed. If prepare is not None, the running time of each call to f
* must be an order of magnitude longer than one millisecond for accurate timing.
@@ -1639,6 +1640,7 @@ private[spark] object Utils extends Logging {
/**
* Creates a symlink.
+ *
* @param src absolute path to the source
* @param dst relative path for the destination
*/