diff options
author | Andrew Or <andrew@databricks.com> | 2016-03-09 17:34:28 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-03-09 17:34:28 -0800 |
commit | 37fcda3e6cf1707fb7a348a4d47231849ef8abf6 (patch) | |
tree | f24a199e92660428a36988e43301ceffe5c57b16 /core | |
parent | dbf2a7cfad067d2c553d8b8831e04aace12fcee1 (diff) | |
download | spark-37fcda3e6cf1707fb7a348a4d47231849ef8abf6.tar.gz spark-37fcda3e6cf1707fb7a348a4d47231849ef8abf6.tar.bz2 spark-37fcda3e6cf1707fb7a348a4d47231849ef8abf6.zip |
[SPARK-13747][SQL] Fix concurrent query with fork-join pool
## What changes were proposed in this pull request?
Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264:
```
(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }
```
This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA.
## How was this patch tested?
New test in `SQLExecutionSuite`.
Author: Andrew Or <andrew@databricks.com>
Closes #11586 from andrewor14/fix-concurrent-sql.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e2eaef5ec4..b576d4c5f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -613,7 +613,12 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) - Await.ready(waiter.completionFuture, atMost = Duration.Inf) + // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, + // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that + // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's + // safe to pass in null here. For more detail, see SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + waiter.completionFuture.ready(Duration.Inf)(awaitPermission) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format |