aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockTransferService.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadUtils.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
9 files changed, 75 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 339266a5d4..a50600f148 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.JobWaiter
+import org.apache.spark.util.ThreadUtils
/**
@@ -45,6 +46,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Blocks until this action completes.
+ *
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @return this FutureAction
@@ -53,6 +55,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Awaits and returns the result (of type T) of this action.
+ *
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @throws Exception exception during action execution
@@ -89,8 +92,8 @@ trait FutureAction[T] extends Future[T] {
/**
* Blocks and returns the result of this job.
*/
- @throws(classOf[Exception])
- def get(): T = Await.result(this, Duration.Inf)
+ @throws(classOf[SparkException])
+ def get(): T = ThreadUtils.awaitResult(this, Duration.Inf)
/**
* Returns the job IDs run by the underlying async operation.
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index abb98f95a1..79f4d06c84 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeoutException
import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -35,7 +35,7 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
@@ -265,7 +265,7 @@ private object FaultToleranceTest extends App with Logging {
}
// Avoid waiting indefinitely (e.g., we could register but get no executors).
- assertTrue(Await.result(f, 120 seconds))
+ assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
}
/**
@@ -318,7 +318,7 @@ private object FaultToleranceTest extends App with Logging {
}
try {
- assertTrue(Await.result(f, 120 seconds))
+ assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
@@ -422,7 +422,7 @@ private object SparkDocker {
}
dockerCmd.run(ProcessLogger(findIpAndLog _))
- val ip = Await.result(ipPromise.future, 30 seconds)
+ val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b443e8f051..edc9be2a8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -24,7 +24,7 @@ import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random
@@ -959,7 +959,7 @@ private[deploy] class Master(
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val futureUI = asyncRebuildSparkUI(app)
- Await.result(futureUI, Duration.Inf)
+ ThreadUtils.awaitResult(futureUI, Duration.Inf)
}
/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index c5a5876a89..21cb94142b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -27,10 +27,11 @@ import scala.collection.mutable
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.io.Source
+import scala.util.control.NonFatal
import com.fasterxml.jackson.core.JsonProcessingException
-import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -258,13 +259,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
}
}
+ // scalastyle:off awaitresult
try { Await.result(responseFuture, 10.seconds) } catch {
+ // scalastyle:on awaitresult
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
case timeout: TimeoutException =>
throw new SubmitRestConnectionException("No response from server", timeout)
+ case NonFatal(t) =>
+ throw new SparkException("Exception while waiting for response", t)
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 09ce012e4e..cb9d389dd7 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -20,7 +20,7 @@ package org.apache.spark.network
import java.io.Closeable
import java.nio.ByteBuffer
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.util.ThreadUtils
private[spark]
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
@@ -100,8 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
result.success(new NioManagedBuffer(ret))
}
})
-
- Await.result(result.future, Duration.Inf)
+ ThreadUtils.awaitResult(result.future, Duration.Inf)
}
/**
@@ -119,6 +119,6 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
- Await.result(future, Duration.Inf)
+ ThreadUtils.awaitResult(future, Duration.Inf)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 2950df62bf..2761d39e37 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -19,10 +19,11 @@ package org.apache.spark.rpc
import java.util.concurrent.TimeoutException
-import scala.concurrent.{Await, Awaitable}
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
+import scala.util.control.NonFatal
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.util.Utils
/**
@@ -65,14 +66,21 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
- * @param awaitable the `Awaitable` to be awaited
- * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
+ *
+ * @param future the `Future` to be awaited
+ * @throws RpcTimeoutException if after waiting for the specified time `future`
* is still not ready
*/
- def awaitResult[T](awaitable: Awaitable[T]): T = {
+ def awaitResult[T](future: Future[T]): T = {
+ val wrapAndRethrow: PartialFunction[Throwable, T] = {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult", t)
+ }
try {
- Await.result(awaitable, duration)
- } catch addMessageIfTimeout
+ // scalastyle:off awaitresult
+ Await.result(future, duration)
+ // scalastyle:on awaitresult
+ } catch addMessageIfTimeout.orElse(wrapAndRethrow)
}
}
@@ -82,6 +90,7 @@ private[spark] object RpcTimeout {
/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
+ *
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @throws NoSuchElementException if property is not set
@@ -95,6 +104,7 @@ private[spark] object RpcTimeout {
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* Uses the given default value if property is not set
+ *
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @param defaultValue default timeout value in seconds if property not found
@@ -109,6 +119,7 @@ private[spark] object RpcTimeout {
* and create a RpcTimeout with the first set property key in the
* description.
* Uses the given default value if property is not set
+ *
* @param conf configuration properties containing the timeout
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 35a6c63ad1..22bc76b143 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -260,7 +260,12 @@ private[spark] class BlockManager(
def waitForAsyncReregister(): Unit = {
val task = asyncReregisterTask
if (task != null) {
- Await.ready(task, Duration.Inf)
+ try {
+ Await.ready(task, Duration.Inf)
+ } catch {
+ case NonFatal(t) =>
+ throw new Exception("Error occurred while waiting for async. reregistration", t)
+ }
}
}
@@ -802,7 +807,12 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
// Wait for asynchronous replication to finish
- Await.ready(replicationFuture, Duration.Inf)
+ try {
+ Await.ready(replicationFuture, Duration.Inf)
+ } catch {
+ case NonFatal(t) =>
+ throw new Exception("Error occurred while waiting for replication to finish", t)
+ }
}
if (blockWasSuccessfullyStored) {
None
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 9abbf4a7a3..5a6dbc8304 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -19,12 +19,15 @@ package org.apache.spark.util
import java.util.concurrent._
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+import org.apache.spark.SparkException
+
private[spark] object ThreadUtils {
private val sameThreadExecutionContext =
@@ -174,4 +177,21 @@ private[spark] object ThreadUtils {
false // asyncMode
)
}
+
+ // scalastyle:off awaitresult
+ /**
+ * Preferred alternative to [[Await.result()]]. This method wraps and re-throws any exceptions
+ * thrown by the underlying [[Await]] call, ensuring that this thread's stack trace appears in
+ * logs.
+ */
+ @throws(classOf[SparkException])
+ def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ Await.result(awaitable, atMost)
+ // scalastyle:on awaitresult
+ } catch {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult: ", t)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 78e164cff7..848f7d7adb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1598,6 +1598,7 @@ private[spark] object Utils extends Logging {
/**
* Timing method based on iterations that permit JVM JIT optimization.
+ *
* @param numIters number of iterations
* @param f function to be executed. If prepare is not None, the running time of each call to f
* must be an order of magnitude longer than one millisecond for accurate timing.
@@ -1639,6 +1640,7 @@ private[spark] object Utils extends Logging {
/**
* Creates a symlink.
+ *
* @param src absolute path to the source
* @param dst relative path for the destination
*/