aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/SortShuffleSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala11
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala15
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala9
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala13
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala18
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala7
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala27
49 files changed, 338 insertions, 142 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index 0b19861fc4..f200ff36c7 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -42,6 +42,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
private val conf: SparkConf = new SparkConf(loadDefaults = false)
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
MockitoAnnotations.initMocks(this)
@@ -55,7 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
}
test("commit shuffle files multiple times") {
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 553d46285a..390764ba24 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -256,8 +256,11 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
}
override def afterEach(): Unit = {
- super.afterEach()
- Utils.deleteRecursively(checkpointDir)
+ try {
+ Utils.deleteRecursively(checkpointDir)
+ } finally {
+ super.afterEach()
+ }
}
override def sparkContext: SparkContext = sc
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 1c775bcb3d..eb3fb99747 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -35,6 +35,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _
override def beforeAll() {
+ super.beforeAll()
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
val transportContext = new TransportContext(transportConf, rpcHandler)
@@ -46,7 +47,11 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
}
override def afterAll() {
- server.close()
+ try {
+ server.close()
+ } finally {
+ super.afterAll()
+ }
}
// This test ensures that the external shuffle service is actually in use for the other tests.
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 1255e71af6..2c32b69715 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -75,8 +75,11 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}
override def afterAll() {
- super.afterAll()
- Utils.deleteRecursively(tmpDir)
+ try {
+ Utils.deleteRecursively(tmpDir)
+ } finally {
+ super.afterAll()
+ }
}
test("Distributing files locally") {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index fdb00aafc4..f6a7f4375f 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -44,8 +44,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
override def afterEach() {
- super.afterEach()
- Utils.deleteRecursively(tempDir)
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
}
test("text files") {
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
index 19180e88eb..10794235ed 100644
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
@@ -24,6 +24,7 @@ class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with hash-based shuffle.
override def beforeAll() {
+ super.beforeAll()
conf.set("spark.shuffle.manager", "hash")
}
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 3cd80c0f7d..9b43341576 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -66,6 +66,7 @@ class HeartbeatReceiverSuite
* that uses a manual clock.
*/
override def beforeEach(): Unit = {
+ super.beforeEach()
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 1168eb0b80..e13a442463 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -38,8 +38,11 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
with LocalSparkContext {
override def afterEach() {
- super.afterEach()
- resetSparkContext()
+ try {
+ resetSparkContext()
+ } finally {
+ super.afterEach()
+ }
}
test("local mode, FIFO scheduler") {
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 8bf2e55def..214681970a 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -28,13 +28,16 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _
override def beforeAll() {
- InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
}
override def afterEach() {
- resetSparkContext()
- super.afterEach()
+ try {
+ resetSparkContext()
+ } finally {
+ super.afterEach()
+ }
}
def resetSparkContext(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 26b95c0678..e0226803bb 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark
import java.io.File
-import org.apache.spark.util.{SparkConfWithEnv, Utils}
+import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
-class SecurityManagerSuite extends SparkFunSuite {
+class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
test("set security with conf") {
val conf = new SparkConf
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 3d2700b7e6..858bc742e0 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -30,13 +30,16 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
var conf = new SparkConf(false)
override def beforeAll() {
- _sc = new SparkContext("local[4]", "test", conf)
super.beforeAll()
+ _sc = new SparkContext("local[4]", "test", conf)
}
override def afterAll() {
- LocalSparkContext.stop(_sc)
- _sc = null
- super.afterAll()
+ try {
+ LocalSparkContext.stop(_sc)
+ _sc = null
+ } finally {
+ super.afterAll()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index d78c99c2e1..73638d9b13 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -24,6 +24,7 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
override def beforeAll() {
+ super.beforeAll()
conf.set("spark.shuffle.blockTransferService", "netty")
}
}
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index b8ab227517..5354731465 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -37,10 +37,12 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
private var tempDir: File = _
override def beforeAll() {
+ super.beforeAll()
conf.set("spark.shuffle.manager", "sort")
}
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
conf.set("spark.local.dir", tempDir.getAbsolutePath)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 314517d296..85c1c1bbf3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -71,15 +71,18 @@ class StandaloneDynamicAllocationSuite
}
override def afterAll(): Unit = {
- masterRpcEnv.shutdown()
- workerRpcEnvs.foreach(_.shutdown())
- master.stop()
- workers.foreach(_.stop())
- masterRpcEnv = null
- workerRpcEnvs = null
- master = null
- workers = null
- super.afterAll()
+ try {
+ masterRpcEnv.shutdown()
+ workerRpcEnvs.foreach(_.shutdown())
+ master.stop()
+ workers.foreach(_.stop())
+ masterRpcEnv = null
+ workerRpcEnvs = null
+ master = null
+ workers = null
+ } finally {
+ super.afterAll()
+ }
}
test("dynamic allocation default behavior") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 1e5c05a73f..415e2b37db 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -63,15 +63,18 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
}
override def afterAll(): Unit = {
- workerRpcEnvs.foreach(_.shutdown())
- masterRpcEnv.shutdown()
- workers.foreach(_.stop())
- master.stop()
- workerRpcEnvs = null
- masterRpcEnv = null
- workers = null
- master = null
- super.afterAll()
+ try {
+ workerRpcEnvs.foreach(_.shutdown())
+ masterRpcEnv.shutdown()
+ workers.foreach(_.stop())
+ master.stop()
+ workerRpcEnvs = null
+ masterRpcEnv = null
+ workers = null
+ master = null
+ } finally {
+ super.afterAll()
+ }
}
test("interface methods of AppClient using local Master") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 4b7fd4f13b..18659fc0c1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.mock.MockitoSugar
import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.ui.{SparkUI, UIUtils}
+import org.apache.spark.util.ResetSystemProperties
/**
* A collection of tests against the historyserver, including comparing responses from the json
@@ -43,7 +44,7 @@ import org.apache.spark.ui.{SparkUI, UIUtils}
* are considered part of Spark's public api.
*/
class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar
- with JsonTestUtils {
+ with JsonTestUtils with ResetSystemProperties {
private val logDir = new File("src/test/resources/spark-events")
private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 9693e32bf6..fa39aa2cb1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -43,8 +43,12 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
private var server: Option[RestSubmissionServer] = None
override def afterEach() {
- rpcEnv.foreach(_.shutdown())
- server.foreach(_.stop())
+ try {
+ rpcEnv.foreach(_.shutdown())
+ server.foreach(_.stop())
+ } finally {
+ super.afterEach()
+ }
}
test("construct submit request") {
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 8a199459c1..24184b02cb 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -47,6 +47,7 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite with BeforeAndAfterAl
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
// we disable FileSystem caching in this suite.
+ super.beforeAll()
val conf = new SparkConf().set("spark.hadoop.fs.file.impl.disable.cache", "true")
sc = new SparkContext("local", "test", conf)
@@ -59,7 +60,11 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite with BeforeAndAfterAl
}
override def afterAll() {
- sc.stop()
+ try {
+ sc.stop()
+ } finally {
+ super.afterAll()
+ }
}
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index 6f8e8a7ac6..92daf4e6a2 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -31,14 +31,18 @@ class NettyBlockTransferServiceSuite
private var service1: NettyBlockTransferService = _
override def afterEach() {
- if (service0 != null) {
- service0.close()
- service0 = null
- }
+ try {
+ if (service0 != null) {
+ service0.close()
+ service0 = null
+ }
- if (service1 != null) {
- service1.close()
- service1 = null
+ if (service1 != null) {
+ service1.close()
+ service1 = null
+ }
+ } finally {
+ super.afterEach()
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index de015ebd5d..d18bde790b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -34,12 +34,17 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
@transient private var sc: SparkContext = _
override def beforeAll() {
+ super.beforeAll()
sc = new SparkContext("local[2]", "test")
}
override def afterAll() {
- LocalSparkContext.stop(sc)
- sc = null
+ try {
+ LocalSparkContext.stop(sc)
+ sc = null
+ } finally {
+ super.afterAll()
+ }
}
lazy val zeroPartRdd = new EmptyRDD[Int](sc)
diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index 5103eb74b2..3a22a9850a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.storage.{RDDBlockId, StorageLevel}
class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
override def beforeEach(): Unit = {
+ super.beforeEach()
sc = new SparkContext("local[2]", "test")
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 7b3a17c172..9c850c0da5 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -44,6 +44,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
var env: RpcEnv = _
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf()
env = createRpcEnv(conf, "local", 0)
@@ -53,10 +54,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
override def afterAll(): Unit = {
- if (env != null) {
- env.shutdown()
+ try {
+ if (env != null) {
+ env.shutdown()
+ }
+ SparkEnv.set(null)
+ } finally {
+ super.afterAll()
}
- SparkEnv.set(null)
}
def createRpcEnv(conf: SparkConf, name: String, port: Int, clientMode: Boolean = false): RpcEnv
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
index 2d5e9d66b2..683aaa3aab 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
@@ -29,6 +29,7 @@ class SerializationDebuggerSuite extends SparkFunSuite with BeforeAndAfterEach {
import SerializationDebugger.find
override def beforeEach(): Unit = {
+ super.beforeEach()
SerializationDebugger.enableDebugging = true
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index d3b1b2b620..bb331bb385 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -55,6 +55,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
outputFile = File.createTempFile("shuffle", null, tempDir)
taskMetrics = new TaskMetrics
@@ -119,9 +120,13 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
- blockIdToFileMap.clear()
- temporaryFilesCreated.clear()
+ try {
+ Utils.deleteRecursively(tempDir)
+ blockIdToFileMap.clear()
+ temporaryFilesCreated.clear()
+ } finally {
+ super.afterEach()
+ }
}
test("write empty iterator") {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bf49be3d4c..2224a444c7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -79,6 +79,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
override def beforeEach(): Unit = {
+ super.beforeEach()
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
@@ -97,22 +98,26 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
override def afterEach(): Unit = {
- if (store != null) {
- store.stop()
- store = null
- }
- if (store2 != null) {
- store2.stop()
- store2 = null
- }
- if (store3 != null) {
- store3.stop()
- store3 = null
+ try {
+ if (store != null) {
+ store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
+ }
+ if (store3 != null) {
+ store3.stop()
+ store3 = null
+ }
+ rpcEnv.shutdown()
+ rpcEnv.awaitTermination()
+ rpcEnv = null
+ master = null
+ } finally {
+ super.afterEach()
}
- rpcEnv.shutdown()
- rpcEnv.awaitTermination()
- rpcEnv = null
- master = null
}
test("StorageLevel object caching") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 688f56f466..69e17461df 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -45,19 +45,27 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
}
override def afterAll() {
- super.afterAll()
- Utils.deleteRecursively(rootDir0)
- Utils.deleteRecursively(rootDir1)
+ try {
+ Utils.deleteRecursively(rootDir0)
+ Utils.deleteRecursively(rootDir1)
+ } finally {
+ super.afterAll()
+ }
}
override def beforeEach() {
+ super.beforeEach()
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
diskBlockManager = new DiskBlockManager(blockManager, conf)
}
override def afterEach() {
- diskBlockManager.stop()
+ try {
+ diskBlockManager.stop()
+ } finally {
+ super.afterEach()
+ }
}
test("basic block creation") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 7c19531c18..5d36617cfc 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -30,11 +30,16 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
var tempDir: File = _
override def beforeEach(): Unit = {
+ super.beforeEach()
tempDir = Utils.createTempDir()
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
}
test("verify write metrics") {
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index ceecfd665b..0e36d7fda4 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -76,14 +76,19 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
override def beforeAll(): Unit = {
+ super.beforeAll()
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}
}
override def afterAll(): Unit = {
- if (webDriver != null) {
- webDriver.quit()
+ try {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index a829b09902..934385fbca 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -38,14 +38,19 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
private var closureSerializer: SerializerInstance = null
override def beforeAll(): Unit = {
+ super.beforeAll()
sc = new SparkContext("local", "test")
closureSerializer = sc.env.closureSerializer.newInstance()
}
override def afterAll(): Unit = {
- sc.stop()
- sc = null
- closureSerializer = null
+ try {
+ sc.stop()
+ sc = null
+ closureSerializer = null
+ } finally {
+ super.afterAll()
+ }
}
// Some fields and methods to reference in inner closures later
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 101610e380..fbe7b95668 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -79,6 +79,10 @@ class SizeEstimatorSuite
System.setProperty("spark.test.useCompressedOops", "true")
}
+ override def afterEach(): Unit = {
+ super.afterEach()
+ }
+
test("simple classes") {
assertResult(16)(SizeEstimator.estimate(new DummyClass1))
assertResult(16)(SizeEstimator.estimate(new DummyClass2))
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index 997f574e51..5f4d5f11bd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -46,8 +46,11 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
}
override def afterAll(): Unit = {
- Utils.deleteRecursively(tempDir)
- super.afterAll()
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterAll()
+ }
}
test("select as sparse vector") {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala b/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala
index c8a0bb1624..8f11bbc8e4 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/TempDirectory.scala
@@ -39,7 +39,10 @@ trait TempDirectory extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll(): Unit = {
- Utils.deleteRecursively(_tempDir)
- super.afterAll()
+ try {
+ Utils.deleteRecursively(_tempDir)
+ } finally {
+ super.afterAll()
+ }
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
index 525ab68c79..4f73b0809d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -25,18 +25,21 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
override def beforeAll() {
+ super.beforeAll()
val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("test-cluster")
.set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data
sc = new SparkContext(conf)
- super.beforeAll()
}
override def afterAll() {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ } finally {
+ super.afterAll()
}
- super.afterAll()
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
index 378139593b..ebcd591465 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
@@ -38,12 +38,15 @@ trait MLlibTestSparkContext extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll() {
- sqlContext = null
- SQLContext.clearActive()
- if (sc != null) {
- sc.stop()
+ try {
+ sqlContext = null
+ SQLContext.clearActive()
+ if (sc != null) {
+ sc.stop()
+ }
+ sc = null
+ } finally {
+ super.afterAll()
}
- sc = null
- super.afterAll()
}
}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 1360f09e7f..05bf7a3aae 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -72,13 +72,16 @@ class ExecutorClassLoaderSuite
}
override def afterAll() {
- super.afterAll()
- if (classServer != null) {
- classServer.stop()
+ try {
+ if (classServer != null) {
+ classServer.stop()
+ }
+ Utils.deleteRecursively(tempDir1)
+ Utils.deleteRecursively(tempDir2)
+ SparkEnv.set(null)
+ } finally {
+ super.afterAll()
}
- Utils.deleteRecursively(tempDir1)
- Utils.deleteRecursively(tempDir2)
- SparkEnv.set(null)
}
test("child first") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index f5f446f14a..4d04138da0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSystemProperties,
+ Utils}
/**
* A input stream that records the times of restore() invoked
@@ -196,7 +197,8 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
* the checkpointing of a DStream's RDDs as well as the checkpointing of
* the whole DStream graph.
*/
-class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester {
+class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
+ with ResetSystemProperties {
var ssc: StreamingContext = null
@@ -208,9 +210,12 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester {
}
override def afterFunction() {
- super.afterFunction()
- if (ssc != null) { ssc.stop() }
- Utils.deleteRecursively(new File(checkpointDir))
+ try {
+ if (ssc != null) { ssc.stop() }
+ Utils.deleteRecursively(new File(checkpointDir))
+ } finally {
+ super.afterFunction()
+ }
}
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index 9b5e4dc819..e897de3cba 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -33,13 +33,18 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
private var ssc: StreamingContext = null
override def beforeAll(): Unit = {
+ super.beforeAll()
val sc = new SparkContext("local", "test")
ssc = new StreamingContext(sc, Seconds(1))
}
override def afterAll(): Unit = {
- ssc.stop(stopSparkContext = true)
- ssc = null
+ try {
+ ssc.stop(stopSparkContext = true)
+ ssc = null
+ } finally {
+ super.afterAll()
+ }
}
test("user provided closures are actually cleaned") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index bc223e648a..4c12ecc399 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -35,13 +35,18 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
private val batchDuration: Duration = Seconds(1)
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
ssc = new StreamingContext(new SparkContext(conf), batchDuration)
}
override def afterAll(): Unit = {
- ssc.stop(stopSparkContext = true)
+ try {
+ ssc.stop(stopSparkContext = true)
+ } finally {
+ super.afterAll()
+ }
}
before { assertPropertiesNotSet() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 6b21433f17..62d75a9e0e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -49,14 +49,19 @@ class MapWithStateSuite extends SparkFunSuite
}
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
sc = new SparkContext(conf)
}
override def afterAll(): Unit = {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
index 6d388d9624..e6d8fbd4d7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
@@ -33,7 +33,11 @@ import org.apache.spark.{SparkConf, SparkEnv}
class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll {
override def afterAll(): Unit = {
- StreamingContext.getActive().map { _.stop() }
+ try {
+ StreamingContext.getActive().map { _.stop() }
+ } finally {
+ super.afterAll()
+ }
}
testWithoutWAL("createBlockRDD creates empty BlockRDD when no block info") { receiverStream =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index a5744a9009..c4ecebcacf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -38,14 +38,19 @@ class UISeleniumSuite
implicit var webDriver: WebDriver = _
override def beforeAll(): Unit = {
+ super.beforeAll()
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}
}
override def afterAll(): Unit = {
- if (webDriver != null) {
- webDriver.quit()
+ try {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
index aa95bd33dd..1640b9e6b7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
@@ -36,6 +36,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
private var checkpointDir: File = _
override def beforeAll(): Unit = {
+ super.beforeAll()
sc = new SparkContext(
new SparkConf().setMaster("local").setAppName("MapWithStateRDDSuite"))
checkpointDir = Utils.createTempDir()
@@ -43,10 +44,14 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
}
override def afterAll(): Unit = {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ Utils.deleteRecursively(checkpointDir)
+ } finally {
+ super.afterAll()
}
- Utils.deleteRecursively(checkpointDir)
}
override def sparkContext: SparkContext = sc
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index cb017b798b..43833c4361 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -42,22 +42,32 @@ class WriteAheadLogBackedBlockRDDSuite
var dir: File = null
override def beforeEach(): Unit = {
+ super.beforeEach()
dir = Utils.createTempDir()
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(dir)
+ try {
+ Utils.deleteRecursively(dir)
+ } finally {
+ super.afterEach()
+ }
}
override def beforeAll(): Unit = {
+ super.beforeAll()
sparkContext = new SparkContext(conf)
blockManager = sparkContext.env.blockManager
}
override def afterAll(): Unit = {
// Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
- sparkContext.stop()
- System.clearProperty("spark.driver.port")
+ try {
+ sparkContext.stop()
+ System.clearProperty("spark.driver.port")
+ } finally {
+ super.afterAll()
+ }
}
test("Read data available in both block manager and write ahead log") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index ef1e89df31..beaae34535 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -432,6 +432,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
private val queueLength = PrivateMethod[Int]('getQueueLength)
override def beforeEach(): Unit = {
+ super.beforeEach()
wal = mock[WriteAheadLog]
walHandle = mock[WriteAheadLogRecordHandle]
walBatchingThreadPool = ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool")
@@ -439,8 +440,12 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
}
override def afterEach(): Unit = {
- if (walBatchingExecutionContext != null) {
- walBatchingExecutionContext.shutdownNow()
+ try {
+ if (walBatchingExecutionContext != null) {
+ walBatchingExecutionContext.shutdownNow()
+ }
+ } finally {
+ super.afterEach()
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 12494b0105..cd24c704ec 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -27,6 +27,7 @@ import scala.language.postfixOps
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
+import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -59,10 +60,13 @@ abstract class BaseYarnClusterSuite
protected var hadoopConfDir: File = _
private var logConfDir: File = _
+ var oldSystemProperties: Properties = null
+
def newYarnConfig(): YarnConfiguration
override def beforeAll() {
super.beforeAll()
+ oldSystemProperties = SerializationUtils.clone(System.getProperties)
tempDir = Utils.createTempDir()
logConfDir = new File(tempDir, "log4j")
@@ -115,9 +119,12 @@ abstract class BaseYarnClusterSuite
}
override def afterAll() {
- yarnCluster.stop()
- System.clearProperty("SPARK_YARN_MODE")
- super.afterAll()
+ try {
+ yarnCluster.stop()
+ } finally {
+ System.setProperties(oldSystemProperties)
+ super.afterAll()
+ }
}
protected def runSpark(
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index e7f2501e78..7709c2f6e4 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -19,12 +19,14 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap => MutableHashMap}
import scala.reflect.ClassTag
import scala.util.Try
+import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
@@ -39,16 +41,26 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, ResetSystemProperties}
-class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
+class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
+ with ResetSystemProperties {
+
+ var oldSystemProperties: Properties = null
override def beforeAll(): Unit = {
+ super.beforeAll()
+ oldSystemProperties = SerializationUtils.clone(System.getProperties)
System.setProperty("SPARK_YARN_MODE", "true")
}
override def afterAll(): Unit = {
- System.clearProperty("SPARK_YARN_MODE")
+ try {
+ System.setProperties(oldSystemProperties)
+ oldSystemProperties = null
+ } finally {
+ super.afterAll()
+ }
}
test("default Yarn application classpath") {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index bd80036c5c..57edbd6725 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -72,13 +72,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
var containerNum = 0
override def beforeEach() {
+ super.beforeEach()
rmClient = AMRMClient.createAMRMClient()
rmClient.init(conf)
rmClient.start()
}
override def afterEach() {
- rmClient.stop()
+ try {
+ rmClient.stop()
+ } finally {
+ super.afterEach()
+ }
}
class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 3fafc91a16..c2861c9d7f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -34,10 +34,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, ResetSystemProperties}
-class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging {
+class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
+ with ResetSystemProperties {
val hasBash =
try {
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 6aa8c814cd..5a426b86d1 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -34,6 +34,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
override def beforeEach(): Unit = {
+ super.beforeEach()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
@@ -54,17 +55,21 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
var s3: YarnShuffleService = null
override def afterEach(): Unit = {
- if (s1 != null) {
- s1.stop()
- s1 = null
- }
- if (s2 != null) {
- s2.stop()
- s2 = null
- }
- if (s3 != null) {
- s3.stop()
- s3 = null
+ try {
+ if (s1 != null) {
+ s1.stop()
+ s1 = null
+ }
+ if (s2 != null) {
+ s2.stop()
+ s2 = null
+ }
+ if (s3 != null) {
+ s3.stop()
+ s3 = null
+ }
+ } finally {
+ super.afterEach()
}
}