aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-13 09:53:22 -0800
committerYin Huai <yhuai@databricks.com>2016-12-13 09:53:22 -0800
commitfb3081d3b38a50aa5e023c603e1b191e57f7c876 (patch)
treeb7dfa91670eeb40dffd2ca27224ea27d8a2dabbe /sql/core/src/main
parentd53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a (diff)
downloadspark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.gz
spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.bz2
spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.zip
[SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool
## What changes were proposed in this pull request? Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: spark.sql.execution.id is already set` when running in Scala ForkJoinPool. This PR includes the following changes to fix this issue: - Remove `ThreadUtils.awaitResult` - Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult` - Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16230 from zsxwing/fix-SPARK-13747.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala3
2 files changed, 2 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index e6f1de5cb0..fb90799534 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -578,7 +578,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
}
override def executeCollect(): Array[InternalRow] = {
- ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf)
+ ThreadUtils.awaitResult(relationFuture, Duration.Inf)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index ce5013daeb..7be5d31d4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -128,8 +128,7 @@ case class BroadcastExchangeExec(
}
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
- ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
- .asInstanceOf[broadcast.Broadcast[T]]
+ ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
}