aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2015-12-24 13:37:28 +0000
committerSean Owen <sowen@cloudera.com>2015-12-24 13:37:28 +0000
commit392046611837a3a740ff97fa8177ca7c12316fb7 (patch)
tree3c5149701ceaec57d12dff971fc0a34c05669c31 /streaming
parent9e85bb71ad2d7d3a9da0cb8853f3216d37e6ff47 (diff)
downloadspark-392046611837a3a740ff97fa8177ca7c12316fb7.tar.gz
spark-392046611837a3a740ff97fa8177ca7c12316fb7.tar.bz2
spark-392046611837a3a740ff97fa8177ca7c12316fb7.zip
[SPARK-12311][CORE] Restore previous value of "os.arch" property in test suites after forcing to set specific value to "os.arch" property
Restore the original value of os.arch property after each test Since some of tests forced to set the specific value to os.arch property, we need to set the original value. Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #10289 from kiszk/SPARK-12311.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala9
9 files changed, 70 insertions, 21 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index f5f446f14a..4d04138da0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSystemProperties,
+ Utils}
/**
* A input stream that records the times of restore() invoked
@@ -196,7 +197,8 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
* the checkpointing of a DStream's RDDs as well as the checkpointing of
* the whole DStream graph.
*/
-class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester {
+class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
+ with ResetSystemProperties {
var ssc: StreamingContext = null
@@ -208,9 +210,12 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester {
}
override def afterFunction() {
- super.afterFunction()
- if (ssc != null) { ssc.stop() }
- Utils.deleteRecursively(new File(checkpointDir))
+ try {
+ if (ssc != null) { ssc.stop() }
+ Utils.deleteRecursively(new File(checkpointDir))
+ } finally {
+ super.afterFunction()
+ }
}
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index 9b5e4dc819..e897de3cba 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -33,13 +33,18 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
private var ssc: StreamingContext = null
override def beforeAll(): Unit = {
+ super.beforeAll()
val sc = new SparkContext("local", "test")
ssc = new StreamingContext(sc, Seconds(1))
}
override def afterAll(): Unit = {
- ssc.stop(stopSparkContext = true)
- ssc = null
+ try {
+ ssc.stop(stopSparkContext = true)
+ ssc = null
+ } finally {
+ super.afterAll()
+ }
}
test("user provided closures are actually cleaned") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index bc223e648a..4c12ecc399 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -35,13 +35,18 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
private val batchDuration: Duration = Seconds(1)
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
ssc = new StreamingContext(new SparkContext(conf), batchDuration)
}
override def afterAll(): Unit = {
- ssc.stop(stopSparkContext = true)
+ try {
+ ssc.stop(stopSparkContext = true)
+ } finally {
+ super.afterAll()
+ }
}
before { assertPropertiesNotSet() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 6b21433f17..62d75a9e0e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -49,14 +49,19 @@ class MapWithStateSuite extends SparkFunSuite
}
override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
sc = new SparkContext(conf)
}
override def afterAll(): Unit = {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
index 6d388d9624..e6d8fbd4d7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
@@ -33,7 +33,11 @@ import org.apache.spark.{SparkConf, SparkEnv}
class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll {
override def afterAll(): Unit = {
- StreamingContext.getActive().map { _.stop() }
+ try {
+ StreamingContext.getActive().map { _.stop() }
+ } finally {
+ super.afterAll()
+ }
}
testWithoutWAL("createBlockRDD creates empty BlockRDD when no block info") { receiverStream =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index a5744a9009..c4ecebcacf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -38,14 +38,19 @@ class UISeleniumSuite
implicit var webDriver: WebDriver = _
override def beforeAll(): Unit = {
+ super.beforeAll()
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}
}
override def afterAll(): Unit = {
- if (webDriver != null) {
- webDriver.quit()
+ try {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ } finally {
+ super.afterAll()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
index aa95bd33dd..1640b9e6b7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
@@ -36,6 +36,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
private var checkpointDir: File = _
override def beforeAll(): Unit = {
+ super.beforeAll()
sc = new SparkContext(
new SparkConf().setMaster("local").setAppName("MapWithStateRDDSuite"))
checkpointDir = Utils.createTempDir()
@@ -43,10 +44,14 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
}
override def afterAll(): Unit = {
- if (sc != null) {
- sc.stop()
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ Utils.deleteRecursively(checkpointDir)
+ } finally {
+ super.afterAll()
}
- Utils.deleteRecursively(checkpointDir)
}
override def sparkContext: SparkContext = sc
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index cb017b798b..43833c4361 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -42,22 +42,32 @@ class WriteAheadLogBackedBlockRDDSuite
var dir: File = null
override def beforeEach(): Unit = {
+ super.beforeEach()
dir = Utils.createTempDir()
}
override def afterEach(): Unit = {
- Utils.deleteRecursively(dir)
+ try {
+ Utils.deleteRecursively(dir)
+ } finally {
+ super.afterEach()
+ }
}
override def beforeAll(): Unit = {
+ super.beforeAll()
sparkContext = new SparkContext(conf)
blockManager = sparkContext.env.blockManager
}
override def afterAll(): Unit = {
// Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
- sparkContext.stop()
- System.clearProperty("spark.driver.port")
+ try {
+ sparkContext.stop()
+ System.clearProperty("spark.driver.port")
+ } finally {
+ super.afterAll()
+ }
}
test("Read data available in both block manager and write ahead log") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index ef1e89df31..beaae34535 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -432,6 +432,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
private val queueLength = PrivateMethod[Int]('getQueueLength)
override def beforeEach(): Unit = {
+ super.beforeEach()
wal = mock[WriteAheadLog]
walHandle = mock[WriteAheadLogRecordHandle]
walBatchingThreadPool = ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool")
@@ -439,8 +440,12 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
}
override def afterEach(): Unit = {
- if (walBatchingExecutionContext != null) {
- walBatchingExecutionContext.shutdownNow()
+ try {
+ if (walBatchingExecutionContext != null) {
+ walBatchingExecutionContext.shutdownNow()
+ }
+ } finally {
+ super.afterEach()
}
}