From 947b9020b0d621bc97661a0a056297e6889936d3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 19 Apr 2016 10:38:10 -0700 Subject: [SPARK-14676] Wrap and re-throw Await.result exceptions in order to capture full stacktrace When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread. This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`. I tested this manually using https://github.com/JoshRosen/spark/commit/16b31c825197ee31a50214c6ba3c1df08148f403, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR. /cc rxin nongli yhuai anabranch Author: Josh Rosen Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions. --- .../org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 4 ++-- .../org/apache/spark/streaming/util/BatchedWriteAheadLog.scala | 7 ++++--- .../scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index f381fa4094..a7d870500f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.receiver -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.{existentials, postfixOps} import scala.util.control.NonFatal @@ -213,7 +213,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) - val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) + val walRecordHandle = ThreadUtils.awaitResult(combinedFuture, blockStoreTimeout) WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 165e81ea41..71f3304f1b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -23,14 +23,14 @@ import java.util.concurrent.LinkedBlockingQueue import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, Promise} +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A wrapper for a WriteAheadLog that batches records before writing data. Handles aggregation @@ -80,7 +80,8 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp } } if (putSuccessfully) { - Await.result(promise.future, WriteAheadLogUtils.getBatchingTimeout(conf).milliseconds) + ThreadUtils.awaitResult( + promise.future, WriteAheadLogUtils.getBatchingTimeout(conf).milliseconds) } else { throw new IllegalStateException("close() was called on BatchedWriteAheadLog before " + s"write request with time $time could be fulfilled.") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 8c980dee2c..24cb5afee3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -38,7 +38,7 @@ import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually._ import org.scalatest.mock.MockitoSugar -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.{CompletionIterator, ManualClock, ThreadUtils, Utils} @@ -471,10 +471,11 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // the BatchedWriteAheadLog should bubble up any exceptions that may have happened during writes val batchedWal = new BatchedWriteAheadLog(wal, sparkConf) - intercept[RuntimeException] { + val e = intercept[SparkException] { val buffer = mock[ByteBuffer] batchedWal.write(buffer, 2L) } + assert(e.getCause.getMessage === "Hello!") } // we make the write requests in separate threads so that we don't block the test thread -- cgit v1.2.3