aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2015-12-24 13:37:28 +0000
committerSean Owen <sowen@cloudera.com>2015-12-24 13:37:28 +0000
commit392046611837a3a740ff97fa8177ca7c12316fb7 (patch)
tree3c5149701ceaec57d12dff971fc0a34c05669c31
parent9e85bb71ad2d7d3a9da0cb8853f3216d37e6ff47 (diff)
downloadspark-392046611837a3a740ff97fa8177ca7c12316fb7.tar.gz
spark-392046611837a3a740ff97fa8177ca7c12316fb7.tar.bz2
spark-392046611837a3a740ff97fa8177ca7c12316fb7.zip
[SPARK-12311][CORE] Restore previous value of "os.arch" property in test suites after forcing to set specific value to "os.arch" property
Restore the original value of os.arch property after each test Since some of tests forced to set the specific value to os.arch property, we need to set the original value. Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #10289 from kiszk/SPARK-12311.
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/SortShuffleSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala11
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala15
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala9
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala13
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala18
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala7
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala27
49 files changed, 338 insertions, 142 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index 0b19861fc4..f200ff36c7 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -42,6 +42,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
private val conf: SparkConf = new SparkConf(loadDefaults = false)
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
MockitoAnnotations.initMocks(this)
@@ -55,7 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
}
test("commit shuffle files multiple times") {
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 553d46285a..390764ba24 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -256,8 +256,11 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
}
override def afterEach(): Unit = {
- super.afterEach()
- Utils.deleteRecursively(checkpointDir)
+ try {
+ Utils.deleteRecursively(checkpointDir)
+ } finally {
+ super.afterEach()
+ }
}
override def sparkContext: SparkContext = sc
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 1c775bcb3d..eb3fb99747 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -35,6 +35,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _
override def beforeAll() {
+ super.beforeAll()
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
val transportContext = new TransportContext(transportConf, rpcHandler)
@@ -46,7 +47,11 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
}
override def afterAll() {
- server.close()
+ try {
+ server.close()
+ } finally {
+ super.afterAll()
+ }
}
// This test ensures that the external shuffle service is actually in use for the other tests.
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 1255e71af6..2c32b69715 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -75,8 +75,11 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}
override def afterAll() {
- super.afterAll()
- Utils.deleteRecursively(tmpDir)
+ try {
+ Utils.deleteRecursively(tmpDir)
+ } finally {
+ super.afterAll()
+ }
}
test("Distributing files locally") {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index fdb00aafc4..f6a7f4375f 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -44,8 +44,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
override def afterEach() {
- super.afterEach()
- Utils.deleteRecursively(tempDir)
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
}
test("text files") {
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
index 19180e88eb..10794235ed 100644
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
@@ -24,6 +24,7 @@ class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with hash-based shuffle.
override def beforeAll() {
+ super.beforeAll()
conf.set("spark.shuffle.manager", "hash")
}
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 3cd80c0f7d..9b43341576 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -66,6 +66,7 @@ class HeartbeatReceiverSuite
* that uses a manual clock.
*/
override def beforeEach(): Unit = {
+ super.beforeEach()
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 1168eb0b80..e13a442463 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -38,8 +38,11 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
with LocalSparkContext {
override def afterEach() {
- super.afterEach()
- resetSparkContext()
+ try {
+ resetSparkContext()
+ } finally {
+ super.afterEach()
+ }
}
test("local mode, FIFO scheduler") {
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 8bf2e55def..214681970a 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -28,13 +28,16 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _
override def beforeAll() {
- InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
}
override def afterEach() {
- resetSparkContext()
- super.afterEach()
+ try {
+ resetSparkContext()
+ } finally {
+ super.afterEach()
+ }
}
def resetSparkContext(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 26b95c0678..e0226803bb 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark
import java.io.File
-import org.apache.spark.util.{SparkConfWithEnv, Utils}
+import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
-class SecurityManagerSuite extends SparkFunSuite {
+class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
test("set security with conf") {
val conf = new SparkConf
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 3d2700b7e6..858bc742e0 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -30,13 +30,16 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
var conf = new SparkConf(false)
override def beforeAll() {
- _sc = new SparkContext("local[4]", "test", conf)
super.beforeAll()
+ _sc = new SparkContext("local[4]", "test", conf)
}
override def afterAll() {
- LocalSparkContext.stop(_sc)
- _sc = null
- super.afterAll()
+ try {
+ LocalSparkContext.stop(_sc)
+ _sc = null
+ } finally {
+ super.afterAll()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index d78c99c2e1..73638d9b13 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -24,6 +24,7 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
override def beforeAll() {
+ super.beforeAll()
conf.set("spark.shuffle.blockTransferService", "netty")
}
}
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index b8ab227517..5354731465 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -37,10 +37,12 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
private var tempDir: File = _
override def beforeAll() {
+ super.beforeAll()
conf.set("spark.shuffle.manager", "sort")
}
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
conf.set("spark.local.dir", tempDir.getAbsolutePath)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 314517d296..85c1c1bbf3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -71,15 +71,18 @@ class StandaloneDynamicAllocationSuite
}
override def afterAll(): Unit = {
- masterRpcEnv.shutdown()
- workerRpcEnvs.foreach(_.shutdown())
- master.stop()
- workers.foreach(_.stop())
- masterRpcEnv = null
- workerRpcEnvs = null
- master = null
- workers = null
- super.afterAll()
+ try {
+ masterRpcEnv.shutdown()
+ workerRpcEnvs.foreach(_.shutdown())
+ master.stop()
+ workers.foreach(_.stop())
+ masterRpcEnv = null
+ workerRpcEnvs = null
+ master = null
+ workers = null
+ } finally {
+ super.afterAll()
+ }
}
test("dynamic allocation default behavior") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 1e5c05a73f..415e2b37db 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -63,15 +63,18 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
}
override def afterAll(): Unit = {
- workerRpcEnvs.foreach(_.shutdown())
- masterRpcEnv.shutdown()
- workers.foreach(_.stop())
- master.stop()
- workerRpcEnvs = null
- masterRpcEnv = null
- workers = null
- master = null
- super.afterAll()
+ try {
+ workerRpcEnvs.foreach(_.shutdown())
+ masterRpcEnv.shutdown()
+ workers.foreach(_.stop())
+ master.stop()
+ workerRpcEnvs = null
+ masterRpcEnv = null
+ workers = null
+ master = null
+ } finally {
+ super.afterAll()
+ }
}
test("interface methods of AppClient using local Master") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 4b7fd4f13b..18659fc0c1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.mock.MockitoSugar
import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.ui.{SparkUI, UIUtils}
+import org.apache.spark.util.ResetSystemProperties
/**
* A collection of tests against the historyserver, including comparing responses from the json
@@ -43,7 +44,7 @@ import org.apache.spark.ui.{SparkUI, UIUtils}
* are considered part of Spark's public api.
*/
class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar
- with JsonTestUtils {
+ with JsonTestUtils with ResetSystemProperties {
private val logDir = new File("src/test/resources/spark-events")
private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 9693e32bf6..fa39aa2cb1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -43,8 +43,12 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
private var server: Option[RestSubmissionServer] = None
override def afterEach() {
- rpcEnv.foreach(_.shutdown())
- server.foreach(_.stop())
+ try {
+ rpcEnv.foreach(_.shutdown())
+ server.foreach(_.stop())
+ } finally {
+ super.afterEach()
+ }
}
test("construct submit request") {
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 8a199459c1..24184b02cb 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -47,6 +47,7 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite with BeforeAndAfterAl
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
// we disable FileSystem caching in this suite.
+ super.beforeAll()
val conf = new SparkConf().set("spark.hadoop.fs.file.impl.disable.cache", "true")
sc = new SparkContext("local", "test", conf)
@@ -59,7 +60,11 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite with BeforeAndAfterAl
}
override def afterAll() {
- sc.stop()
+ try {
+ sc.stop()
+ } finally {
+ super.afterAll()
+ }
}
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index 6f8e8a7ac6..92daf4e6a2 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -31,14 +31,18 @@ class NettyBlockTransferServiceSuite
private var service1: NettyBlockTransferService = _
override def afterEach() {
- if (service0 != null) {
- service0.close()
- service0 = null
- }
+ try {
+ if (service0 != null) {
+ service0.close()
+ service0 = null
+ }
- if (service1 != null) {
- service1.close()
- service1 = null
+ if (service1 != null) {
+ service1.close()
+ service1 = null
+ }
+ } finally {
+ super.afterEach()
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index de015ebd5d..d18bde790b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -34,12 +34,17 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
@transient private var sc: SparkContext = _
override def beforeAll() {
+ super.beforeAll()
sc = new SparkContext("local[2]", "test")
}
override def afterAll() {
- LocalSparkContext.stop(sc)
- sc = null
+ try {
+ LocalSparkContext.stop(sc)
+ sc = null
+ } finally {
+ super.afterAll()
+ }
}
lazy val zeroPartRdd = new EmptyRDD[Int](sc)
diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index 5103eb74b2..3a22a9850a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.storage.{RDDBlockId, StorageLevel}
class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
override def beforeEach(): Unit = {
+ super.beforeEach()
sc = new SparkContext("local[2]", "test")
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 7b3a17c172..9c850c0da5 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -44,6 +44,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
var env: RpcEnv = _
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf()
env = createRpcEnv(conf, "local", 0)
@@ -53,10 +54,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
override def afterAll(): Unit = {
- if (env != null) {
- env.shutdown()
+ try {
+ if (env != null) {
+ env.shutdown()
+ }
+ SparkEnv.set(null)
+ } finally {
+ super.afterAll()
}
- SparkEnv.set(null)
}
def createRpcEnv(conf: SparkConf, name: String, port: Int, clientMode: Boolean = false): RpcEnv
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
index 2d5e9d66b2..683aaa3aab 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
@@ -29,6 +29,7 @@ class SerializationDebuggerSuite extends SparkFunSuite with BeforeAndAfterEach {
import SerializationDebugger.find
override def beforeEach(): Unit = {
+ super.beforeEach()
SerializationDebugger.enableDebugging = true
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index d3b1b2b620..bb331bb385 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -55,6 +55,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
outputFile = File.createTempFile("shuffle", null, tempDir)
taskMetrics = new TaskMetrics
@@ -119,9 +120,13 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
- blockIdToFileMap.clear()
- temporaryFilesCreated.clear()
+ try {
+ Utils.deleteRecursively(tempDir)
+ blockIdToFileMap.clear()
+ temporaryFilesCreated.clear()
+ } finally {
+ super.afterEach()
+ }
}
test("write empty iterator") {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bf49be3d4c..2224a444c7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -79,6 +79,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
override def beforeEach(): Unit = {
+ super.beforeEach()
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
@@ -97,22 +98,26 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
override def afterEach(): Unit = {
- if (store != null) {
- store.stop()
- store = null
- }
- if (store2 != null) {
- store2.stop()
- store2 = null
- }
- if (store3 != null) {
- store3.stop()
- store3 = null
+ try {
+ if (store != null) {
+ store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
+ }
+ if (store3 != null) {
+ store3.stop()
+ store3 = null
+ }
+ rpcEnv.shutdown()
+ rpcEnv.awaitTermination()
+ rpcEnv = null
+ master = null
+ } finally {
+ super.afterEach()
}
- rpcEnv.shutdown()
- rpcEnv.awaitTermination()
- rpcEnv = null
- master = null
}
test("StorageLevel object caching") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 688f56f466..69e17461df 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -45,19 +45,27 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
}
override def afterAll() {
- super.afterAll()
- Utils.deleteRecursively(rootDir0)
- Utils.deleteRecursively(rootDir1)
+ try {
+ Utils.deleteRecursively(rootDir0)
+ Utils.deleteRecursively(rootDir1)
+ } finally {
+ super.afterAll()
+ }
}
override def beforeEach() {
+ super.beforeEach()
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
diskBlockManager = new DiskBlockManager(blockManager, conf)
}
override def afterEach() {
- diskBlockManager.stop()
+ try {
+ diskBlockManager.stop()
+ } finally {
+ super.afterEach()
+ }
}
test("basic block creation") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 7c19531c18..5d36617cfc 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -30,11 +30,16 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
var tempDir: File = _
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
}
test("verify write metrics") {
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index ceecfd665b..0e36d7fda4 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -76,14 +76,19 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
override def beforeAll(): Unit = {
+ super.beforeAll()
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}
}
override def afterAll(): Unit = {
- if (webDriver != null) {
- webDriver.quit()
+ try {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index a829b09902..934385fbca 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -38,14 +38,19 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
private var closureSerializer: SerializerInstance = null
override def beforeAll(): Unit = {
+ super.beforeAll()
sc = new SparkContext("local", "test")
closureSerializer = sc.env.closureSerializer.newInstance()
}
override def afterAll(): Unit = {
- sc.stop()
- sc = null
- closureSerializer = null
+ try {
+ sc.stop()
+ sc = null
+ closureSerializer = null
+ } finally {
+ super.afterAll()
+ }
}
// Some fields and methods to reference in inner closures later
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 101610e380..fbe7b95668 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -79,6 +79,10 @@ class SizeEstimatorSuite
System.setProperty("spark.test.useCompressedOops", "true")
}
+ override def afterEach(): Unit = {
+ super.afterEach()
+ }
+
test("simple classes") {
assertResult(16)(SizeEstimator.estimate(new DummyClass1))
assertResult(16)(SizeEstimator.estimate(new DummyClass2))
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index 997f574e51..5f4d5f11bd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -46,8 +46,11 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
}
override def afterAll(): Unit = {
- Utils.deleteRecursively(tempDir)
- super.afterAll()
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterAll()
+ }
}
test("select as sparse vector") {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala b/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala
index c8a0bb1624..8f11bbc8e4 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala
@@ -39,7 +39,10 @@ trait TempDirectory extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll(): Unit = {
- Utils.deleteRecursively(_tempDir)
- super.afterAll()
+ try {
+ Utils.deleteRecursively(_tempDir)
+ } finally {
+ super.afterAll()
+ }
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
index 525ab68c79..4f73b0809d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -25,18 +25,21 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
override def beforeAll() {
+ super.beforeAll()
val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("test-cluster")
.set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data
sc = new SparkContext(conf)
- super.beforeAll()
}
override def afterAll() {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ } finally {
+ super.afterAll()
}
- super.afterAll()
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
index 378139593b..ebcd591465 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
@@ -38,12 +38,15 @@ trait MLlibTestSparkContext extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll() {
- sqlContext = null
- SQLContext.clearActive()
- if (sc != null) {
- sc.stop()
+ try {
+ sqlContext = null
+ SQLContext.clearActive()
+ if (sc != null) {
+ sc.stop()
+ }
+ sc = null
+ } finally {
+ super.afterAll()
}
- sc = null
- super.afterAll()
}
}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 1360f09e7f..05bf7a3aae 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -72,13 +72,16 @@ class ExecutorClassLoaderSuite
}
override def afterAll() {
- super.afterAll()
- if (classServer != null) {
- classServer.stop()
+ try {
+ if (classServer != null) {
+ classServer.stop()
+ }
+ Utils.deleteRecursively(tempDir1)
+ Utils.deleteRecursively(tempDir2)
+ SparkEnv.set(null)
+ } finally {
+ super.afterAll()
}
- Utils.deleteRecursively(tempDir1)
- Utils.deleteRecursively(tempDir2)
- SparkEnv.set(null)
}
test("child first") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index f5f446f14a..4d04138da0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSystemProperties,
+ Utils}
/**
* A input stream that records the times of restore() invoked
@@ -196,7 +197,8 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
* the checkpointing of a DStream's RDDs as well as the checkpointing of
* the whole DStream graph.
*/
-class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester {
+class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
+ with ResetSystemProperties {
var ssc: StreamingContext = null
@@ -208,9 +210,12 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester {
}
override def afterFunction() {
- super.afterFunction()
- if (ssc != null) { ssc.stop() }
- Utils.deleteRecursively(new File(checkpointDir))
+ try {
+ if (ssc != null) { ssc.stop() }
+ Utils.deleteRecursively(new File(checkpointDir))
+ } finally {
+ super.afterFunction()
+ }
}
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index 9b5e4dc819..e897de3cba 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -33,13 +33,18 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
private var ssc: StreamingContext = null
override def beforeAll(): Unit = {
+ super.beforeAll()
val sc = new SparkContext("local", "test")
ssc = new StreamingContext(sc, Seconds(1))
}
override def afterAll(): Unit = {
- ssc.stop(stopSparkContext = true)
- ssc = null
+ try {
+ ssc.stop(stopSparkContext = true)
+ ssc = null
+ } finally {
+ super.afterAll()
+ }
}
test("user provided closures are actually cleaned") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index bc223e648a..4c12ecc399 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -35,13 +35,18 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
private val batchDuration: Duration = Seconds(1)
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
ssc = new StreamingContext(new SparkContext(conf), batchDuration)
}
override def afterAll(): Unit = {
- ssc.stop(stopSparkContext = true)
+ try {
+ ssc.stop(stopSparkContext = true)
+ } finally {
+ super.afterAll()
+ }
}
before { assertPropertiesNotSet() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 6b21433f17..62d75a9e0e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -49,14 +49,19 @@ class MapWithStateSuite extends SparkFunSuite
}
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
sc = new SparkContext(conf)
}
override def afterAll(): Unit = {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
index 6d388d9624..e6d8fbd4d7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
@@ -33,7 +33,11 @@ import org.apache.spark.{SparkConf, SparkEnv}
class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll {
override def afterAll(): Unit = {
- StreamingContext.getActive().map { _.stop() }
+ try {
+ StreamingContext.getActive().map { _.stop() }
+ } finally {
+ super.afterAll()
+ }
}
testWithoutWAL("createBlockRDD creates empty BlockRDD when no block info") { receiverStream =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index a5744a9009..c4ecebcacf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -38,14 +38,19 @@ class UISeleniumSuite
implicit var webDriver: WebDriver = _
override def beforeAll(): Unit = {
+ super.beforeAll()
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}
}
override def afterAll(): Unit = {
- if (webDriver != null) {
- webDriver.quit()
+ try {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
index aa95bd33dd..1640b9e6b7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
@@ -36,6 +36,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
private var checkpointDir: File = _
override def beforeAll(): Unit = {
+ super.beforeAll()
sc = new SparkContext(
new SparkConf().setMaster("local").setAppName("MapWithStateRDDSuite"))
checkpointDir = Utils.createTempDir()
@@ -43,10 +44,14 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
}
override def afterAll(): Unit = {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ Utils.deleteRecursively(checkpointDir)
+ } finally {
+ super.afterAll()
}
- Utils.deleteRecursively(checkpointDir)
}
override def sparkContext: SparkContext = sc
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index cb017b798b..43833c4361 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -42,22 +42,32 @@ class WriteAheadLogBackedBlockRDDSuite
var dir: File = null
override def beforeEach(): Unit = {
+ super.beforeEach()
dir = Utils.createTempDir()
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(dir)
+ try {
+ Utils.deleteRecursively(dir)
+ } finally {
+ super.afterEach()
+ }
}
override def beforeAll(): Unit = {
+ super.beforeAll()
sparkContext = new SparkContext(conf)
blockManager = sparkContext.env.blockManager
}
override def afterAll(): Unit = {
// Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
- sparkContext.stop()
- System.clearProperty("spark.driver.port")
+ try {
+ sparkContext.stop()
+ System.clearProperty("spark.driver.port")
+ } finally {
+ super.afterAll()
+ }
}
test("Read data available in both block manager and write ahead log") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index ef1e89df31..beaae34535 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -432,6 +432,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
private val queueLength = PrivateMethod[Int]('getQueueLength)
override def beforeEach(): Unit = {
+ super.beforeEach()
wal = mock[WriteAheadLog]
walHandle = mock[WriteAheadLogRecordHandle]
walBatchingThreadPool = ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool")
@@ -439,8 +440,12 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
}
override def afterEach(): Unit = {
- if (walBatchingExecutionContext != null) {
- walBatchingExecutionContext.shutdownNow()
+ try {
+ if (walBatchingExecutionContext != null) {
+ walBatchingExecutionContext.shutdownNow()
+ }
+ } finally {
+ super.afterEach()
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 12494b0105..cd24c704ec 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -27,6 +27,7 @@ import scala.language.postfixOps
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
+import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -59,10 +60,13 @@ abstract class BaseYarnClusterSuite
protected var hadoopConfDir: File = _
private var logConfDir: File = _
+ var oldSystemProperties: Properties = null
+
def newYarnConfig(): YarnConfiguration
override def beforeAll() {
super.beforeAll()
+ oldSystemProperties = SerializationUtils.clone(System.getProperties)
tempDir = Utils.createTempDir()
logConfDir = new File(tempDir, "log4j")
@@ -115,9 +119,12 @@ abstract class BaseYarnClusterSuite
}
override def afterAll() {
- yarnCluster.stop()
- System.clearProperty("SPARK_YARN_MODE")
- super.afterAll()
+ try {
+ yarnCluster.stop()
+ } finally {
+ System.setProperties(oldSystemProperties)
+ super.afterAll()
+ }
}
protected def runSpark(
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index e7f2501e78..7709c2f6e4 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -19,12 +19,14 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap => MutableHashMap}
import scala.reflect.ClassTag
import scala.util.Try
+import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
@@ -39,16 +41,26 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, ResetSystemProperties}
-class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
+class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
+ with ResetSystemProperties {
+
+ var oldSystemProperties: Properties = null
override def beforeAll(): Unit = {
+ super.beforeAll()
+ oldSystemProperties = SerializationUtils.clone(System.getProperties)
System.setProperty("SPARK_YARN_MODE", "true")
}
override def afterAll(): Unit = {
- System.clearProperty("SPARK_YARN_MODE")
+ try {
+ System.setProperties(oldSystemProperties)
+ oldSystemProperties = null
+ } finally {
+ super.afterAll()
+ }
}
test("default Yarn application classpath") {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index bd80036c5c..57edbd6725 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -72,13 +72,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
var containerNum = 0
override def beforeEach() {
+ super.beforeEach()
rmClient = AMRMClient.createAMRMClient()
rmClient.init(conf)
rmClient.start()
}
override def afterEach() {
- rmClient.stop()
+ try {
+ rmClient.stop()
+ } finally {
+ super.afterEach()
+ }
}
class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 3fafc91a16..c2861c9d7f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -34,10 +34,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, ResetSystemProperties}
-class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging {
+class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
+ with ResetSystemProperties {
val hasBash =
try {
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 6aa8c814cd..5a426b86d1 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -34,6 +34,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
override def beforeEach(): Unit = {
+ super.beforeEach()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
@@ -54,17 +55,21 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
var s3: YarnShuffleService = null
override def afterEach(): Unit = {
- if (s1 != null) {
- s1.stop()
- s1 = null
- }
- if (s2 != null) {
- s2.stop()
- s2 = null
- }
- if (s3 != null) {
- s3.stop()
- s3 = null
+ try {
+ if (s1 != null) {
+ s1.stop()
+ s1 = null
+ }
+ if (s2 != null) {
+ s2.stop()
+ s2 = null
+ }
+ if (s3 != null) {
+ s3.stop()
+ s3 = null
+ }
+ } finally {
+ super.afterEach()
}
}