aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala14
2 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2eaef5ec4..b576d4c5f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -613,7 +613,12 @@ class DAGScheduler(
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
- Await.ready(waiter.completionFuture, atMost = Duration.Inf)
+ // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
+ // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
+ // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
+ // safe to pass in null here. For more detail, see SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 824d89e3b2..c9f517ca34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite {
}
}
+ test("concurrent query execution with fork-join pool (SPARK-13747)") {
+ val sc = new SparkContext("local[*]", "test")
+ val sqlContext = new SQLContext(sc)
+ import sqlContext.implicits._
+ try {
+ // Should not throw IllegalArgumentException
+ (1 to 100).par.foreach { _ =>
+ sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
+ }
+ } finally {
+ sc.stop()
+ }
+ }
+
/**
* Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
*/