From 7ac70e7ba8d610a45c21a70dc28e4c989c19451b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 26 Oct 2016 10:36:36 -0700 Subject: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL ## What changes were proposed in this pull request? Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool. This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15520 from zsxwing/SPARK-13747. --- .../scala/org/apache/spark/util/ThreadUtils.scala | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5a6dbc8304..d093e7bfc3 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -194,4 +194,25 @@ private[spark] object ThreadUtils { throw new SparkException("Exception thrown in awaitResult: ", t) } } + + /** + * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps + * and re-throws any exceptions with nice stack track. + * + * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent + * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method + * basically prevents ForkJoinPool from running other tasks in the current waiting thread. + */ + @throws(classOf[SparkException]) + def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = { + try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(Duration.Inf)(awaitPermission) + } catch { + case NonFatal(t) => + throw new SparkException("Exception thrown in awaitResult: ", t) + } + } } -- cgit v1.2.3