aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-13 09:53:22 -0800
committerYin Huai <yhuai@databricks.com>2016-12-13 09:53:22 -0800
commitfb3081d3b38a50aa5e023c603e1b191e57f7c876 (patch)
treeb7dfa91670eeb40dffd2ca27224ea27d8a2dabbe /core/src/test
parentd53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a (diff)
downloadspark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.gz
spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.tar.bz2
spark-fb3081d3b38a50aa5e023c603e1b191e57f7c876.zip
[SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool
## What changes were proposed in this pull request? Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: spark.sql.execution.id is already set` when running in Scala ForkJoinPool. This PR includes the following changes to fix this issue: - Remove `ThreadUtils.awaitResult` - Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult` - Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16230 from zsxwing/fix-SPARK-13747.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala3
2 files changed, 2 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index 58664e77d2..b29a53cffe 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -199,10 +199,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
val f = sc.parallelize(1 to 100, 4)
.mapPartitions(itr => { Thread.sleep(20); itr })
.countAsync()
- val e = intercept[SparkException] {
+ intercept[TimeoutException] {
ThreadUtils.awaitResult(f, Duration(20, "milliseconds"))
}
- assert(e.getCause.isInstanceOf[TimeoutException])
}
private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 83288db92b..8c4e389e86 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -158,10 +158,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
0 until rdd.partitions.size, resultHandler, () => Unit)
// It's an error if the job completes successfully even though no committer was authorized,
// so throw an exception if the job was allowed to complete.
- val e = intercept[SparkException] {
+ intercept[TimeoutException] {
ThreadUtils.awaitResult(futureAction, 5 seconds)
}
- assert(e.getCause.isInstanceOf[TimeoutException])
assert(tempDir.list().size === 0)
}