diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-13 09:53:22 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-12-13 09:53:22 -0800 |
commit | fb3081d3b38a50aa5e023c603e1b191e57f7c876 (patch) | |
tree | b7dfa91670eeb40dffd2ca27224ea27d8a2dabbe /sql/core/src/main | |
parent | d53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a (diff) | |
download | spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.gz spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.bz2 spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.zip |
[SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool
## What changes were proposed in this pull request?
Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: spark.sql.execution.id is already set` when running in Scala ForkJoinPool.
This PR includes the following changes to fix this issue:
- Remove `ThreadUtils.awaitResult`
- Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult`
- Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16230 from zsxwing/fix-SPARK-13747.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala | 2 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala | 3 |
2 files changed, 2 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index e6f1de5cb0..fb90799534 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -578,7 +578,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { } override def executeCollect(): Array[InternalRow] = { - ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf) + ThreadUtils.awaitResult(relationFuture, Duration.Inf) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index ce5013daeb..7be5d31d4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -128,8 +128,7 @@ case class BroadcastExchangeExec( } override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout) - .asInstanceOf[broadcast.Broadcast[T]] + ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] } } |