aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-03-09 17:34:28 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-03-09 17:34:28 -0800
commit37fcda3e6cf1707fb7a348a4d47231849ef8abf6 (patch)
treef24a199e92660428a36988e43301ceffe5c57b16 /core
parentdbf2a7cfad067d2c553d8b8831e04aace12fcee1 (diff)
downloadspark-37fcda3e6cf1707fb7a348a4d47231849ef8abf6.tar.gz
spark-37fcda3e6cf1707fb7a348a4d47231849ef8abf6.tar.bz2
spark-37fcda3e6cf1707fb7a348a4d47231849ef8abf6.zip
[SPARK-13747][SQL] Fix concurrent query with fork-join pool
## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11586 from andrewor14/fix-concurrent-sql.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2eaef5ec4..b576d4c5f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -613,7 +613,12 @@ class DAGScheduler(
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
- Await.ready(waiter.completionFuture, atMost = Duration.Inf)
+ // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
+ // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
+ // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
+ // safe to pass in null here. For more detail, see SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format