diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-13 09:53:22 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-12-13 09:53:22 -0800 |
commit | fb3081d3b38a50aa5e023c603e1b191e57f7c876 (patch) | |
tree | b7dfa91670eeb40dffd2ca27224ea27d8a2dabbe /core/src/test | |
parent | d53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a (diff) | |
download | spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.gz spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.bz2 spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.zip |
[SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool
## What changes were proposed in this pull request?
Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: spark.sql.execution.id is already set` when running in Scala ForkJoinPool.
This PR includes the following changes to fix this issue:
- Remove `ThreadUtils.awaitResult`
- Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult`
- Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16230 from zsxwing/fix-SPARK-13747.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 3 |
2 files changed, 2 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 58664e77d2..b29a53cffe 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -199,10 +199,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim val f = sc.parallelize(1 to 100, 4) .mapPartitions(itr => { Thread.sleep(20); itr }) .countAsync() - val e = intercept[SparkException] { + intercept[TimeoutException] { ThreadUtils.awaitResult(f, Duration(20, "milliseconds")) } - assert(e.getCause.isInstanceOf[TimeoutException]) } private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 83288db92b..8c4e389e86 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -158,10 +158,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { 0 until rdd.partitions.size, resultHandler, () => Unit) // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. - val e = intercept[SparkException] { + intercept[TimeoutException] { ThreadUtils.awaitResult(futureAction, 5 seconds) } - assert(e.getCause.isInstanceOf[TimeoutException]) assert(tempDir.list().size === 0) } |