aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-26 10:36:36 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-26 10:36:36 -0700
commit7ac70e7ba8d610a45c21a70dc28e4c989c19451b (patch)
tree14cb17b246590e2d897f354f87e955b0d55b4b1f /core/src
parent312ea3f7f65532818e11016d6d780ad47485175f (diff)
downloadspark-7ac70e7ba8d610a45c21a70dc28e4c989c19451b.tar.gz
spark-7ac70e7ba8d610a45c21a70dc28e4c989c19451b.tar.bz2
spark-7ac70e7ba8d610a45c21a70dc28e4c989c19451b.zip
[SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL
## What changes were proposed in this pull request? Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool. This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15520 from zsxwing/SPARK-13747.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadUtils.scala21
1 files changed, 21 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5a6dbc8304..d093e7bfc3 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
+
+ /**
+ * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps
+ * and re-throws any exceptions with nice stack track.
+ *
+ * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent
+ * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method
+ * basically prevents ForkJoinPool from running other tasks in the current waiting thread.
+ */
+ @throws(classOf[SparkException])
+ def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+ // See SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.result(Duration.Inf)(awaitPermission)
+ } catch {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult: ", t)
+ }
+ }
}