diff options
Diffstat (limited to 'core/src/main/scala')
9 files changed, 75 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 339266a5d4..a50600f148 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.JobWaiter +import org.apache.spark.util.ThreadUtils /** @@ -45,6 +46,7 @@ trait FutureAction[T] extends Future[T] { /** * Blocks until this action completes. + * * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf * for unbounded waiting, or a finite positive duration * @return this FutureAction @@ -53,6 +55,7 @@ trait FutureAction[T] extends Future[T] { /** * Awaits and returns the result (of type T) of this action. + * * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf * for unbounded waiting, or a finite positive duration * @throws Exception exception during action execution @@ -89,8 +92,8 @@ trait FutureAction[T] extends Future[T] { /** * Blocks and returns the result of this job. */ - @throws(classOf[Exception]) - def get(): T = Await.result(this, Duration.Inf) + @throws(classOf[SparkException]) + def get(): T = ThreadUtils.awaitResult(this, Duration.Inf) /** * Returns the job IDs run by the underlying async operation. diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index abb98f95a1..79f4d06c84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps @@ -35,7 +35,7 @@ import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.master.RecoveryState import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master. @@ -265,7 +265,7 @@ private object FaultToleranceTest extends App with Logging { } // Avoid waiting indefinitely (e.g., we could register but get no executors). - assertTrue(Await.result(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) } /** @@ -318,7 +318,7 @@ private object FaultToleranceTest extends App with Logging { } try { - assertTrue(Await.result(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) } catch { case e: TimeoutException => logError("Master states: " + masters.map(_.state)) @@ -422,7 +422,7 @@ private object SparkDocker { } dockerCmd.run(ProcessLogger(findIpAndLog _)) - val ip = Await.result(ipPromise.future, 30 seconds) + val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds) val dockerId = Docker.getLastProcessId (ip, dockerId, outFile) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b443e8f051..edc9be2a8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -24,7 +24,7 @@ import java.util.Date import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.language.postfixOps import scala.util.Random @@ -959,7 +959,7 @@ private[deploy] class Master( */ private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = { val futureUI = asyncRebuildSparkUI(app) - Await.result(futureUI, Duration.Inf) + ThreadUtils.awaitResult(futureUI, Duration.Inf) } /** Rebuild a new SparkUI asynchronously to not block RPC event loop */ diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index c5a5876a89..21cb94142b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -27,10 +27,11 @@ import scala.collection.mutable import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.io.Source +import scala.util.control.NonFatal import com.fasterxml.jackson.core.JsonProcessingException -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -258,13 +259,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } } + // scalastyle:off awaitresult try { Await.result(responseFuture, 10.seconds) } catch { + // scalastyle:on awaitresult case unreachable @ (_: FileNotFoundException | _: SocketException) => throw new SubmitRestConnectionException("Unable to connect to server", unreachable) case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => throw new SubmitRestProtocolException("Malformed response received from server", malformed) case timeout: TimeoutException => throw new SubmitRestConnectionException("No response from server", timeout) + case NonFatal(t) => + throw new SparkException("Exception while waiting for response", t) } } diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 09ce012e4e..cb9d389dd7 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -20,7 +20,7 @@ package org.apache.spark.network import java.io.Closeable import java.nio.ByteBuffer -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Future, Promise} import scala.concurrent.duration.Duration import scala.reflect.ClassTag @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.util.ThreadUtils private[spark] abstract class BlockTransferService extends ShuffleClient with Closeable with Logging { @@ -100,8 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo result.success(new NioManagedBuffer(ret)) } }) - - Await.result(result.future, Duration.Inf) + ThreadUtils.awaitResult(result.future, Duration.Inf) } /** @@ -119,6 +119,6 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo level: StorageLevel, classTag: ClassTag[_]): Unit = { val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) - Await.result(future, Duration.Inf) + ThreadUtils.awaitResult(future, Duration.Inf) } } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala index 2950df62bf..2761d39e37 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala @@ -19,10 +19,11 @@ package org.apache.spark.rpc import java.util.concurrent.TimeoutException -import scala.concurrent.{Await, Awaitable} +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import scala.util.control.NonFatal -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.util.Utils /** @@ -65,14 +66,21 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S /** * Wait for the completed result and return it. If the result is not available within this * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout. - * @param awaitable the `Awaitable` to be awaited - * @throws RpcTimeoutException if after waiting for the specified time `awaitable` + * + * @param future the `Future` to be awaited + * @throws RpcTimeoutException if after waiting for the specified time `future` * is still not ready */ - def awaitResult[T](awaitable: Awaitable[T]): T = { + def awaitResult[T](future: Future[T]): T = { + val wrapAndRethrow: PartialFunction[Throwable, T] = { + case NonFatal(t) => + throw new SparkException("Exception thrown in awaitResult", t) + } try { - Await.result(awaitable, duration) - } catch addMessageIfTimeout + // scalastyle:off awaitresult + Await.result(future, duration) + // scalastyle:on awaitresult + } catch addMessageIfTimeout.orElse(wrapAndRethrow) } } @@ -82,6 +90,7 @@ private[spark] object RpcTimeout { /** * Lookup the timeout property in the configuration and create * a RpcTimeout with the property key in the description. + * * @param conf configuration properties containing the timeout * @param timeoutProp property key for the timeout in seconds * @throws NoSuchElementException if property is not set @@ -95,6 +104,7 @@ private[spark] object RpcTimeout { * Lookup the timeout property in the configuration and create * a RpcTimeout with the property key in the description. * Uses the given default value if property is not set + * * @param conf configuration properties containing the timeout * @param timeoutProp property key for the timeout in seconds * @param defaultValue default timeout value in seconds if property not found @@ -109,6 +119,7 @@ private[spark] object RpcTimeout { * and create a RpcTimeout with the first set property key in the * description. * Uses the given default value if property is not set + * * @param conf configuration properties containing the timeout * @param timeoutPropList prioritized list of property keys for the timeout in seconds * @param defaultValue default timeout value in seconds if no properties found diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 35a6c63ad1..22bc76b143 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -260,7 +260,12 @@ private[spark] class BlockManager( def waitForAsyncReregister(): Unit = { val task = asyncReregisterTask if (task != null) { - Await.ready(task, Duration.Inf) + try { + Await.ready(task, Duration.Inf) + } catch { + case NonFatal(t) => + throw new Exception("Error occurred while waiting for async. reregistration", t) + } } } @@ -802,7 +807,12 @@ private[spark] class BlockManager( logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { // Wait for asynchronous replication to finish - Await.ready(replicationFuture, Duration.Inf) + try { + Await.ready(replicationFuture, Duration.Inf) + } catch { + case NonFatal(t) => + throw new Exception("Error occurred while waiting for replication to finish", t) + } } if (blockWasSuccessfullyStored) { None diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 9abbf4a7a3..5a6dbc8304 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -19,12 +19,15 @@ package org.apache.spark.util import java.util.concurrent._ -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.duration.Duration import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.util.control.NonFatal import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} +import org.apache.spark.SparkException + private[spark] object ThreadUtils { private val sameThreadExecutionContext = @@ -174,4 +177,21 @@ private[spark] object ThreadUtils { false // asyncMode ) } + + // scalastyle:off awaitresult + /** + * Preferred alternative to [[Await.result()]]. This method wraps and re-throws any exceptions + * thrown by the underlying [[Await]] call, ensuring that this thread's stack trace appears in + * logs. + */ + @throws(classOf[SparkException]) + def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = { + try { + Await.result(awaitable, atMost) + // scalastyle:on awaitresult + } catch { + case NonFatal(t) => + throw new SparkException("Exception thrown in awaitResult: ", t) + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 78e164cff7..848f7d7adb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1598,6 +1598,7 @@ private[spark] object Utils extends Logging { /** * Timing method based on iterations that permit JVM JIT optimization. + * * @param numIters number of iterations * @param f function to be executed. If prepare is not None, the running time of each call to f * must be an order of magnitude longer than one millisecond for accurate timing. @@ -1639,6 +1640,7 @@ private[spark] object Utils extends Logging { /** * Creates a symlink. + * * @param src absolute path to the source * @param dst relative path for the destination */ |