aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-18 17:00:13 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-18 17:00:13 -0700
commit9108eff74a2815986fd067b273c2a344b6315405 (patch)
treec3edec4f0880407b53d025a6d8052ab9bdc3c9f3 /streaming
parentc635a16f64c939182196b46725ef2d00ed107cca (diff)
downloadspark-9108eff74a2815986fd067b273c2a344b6315405.tar.gz
spark-9108eff74a2815986fd067b273c2a344b6315405.tar.bz2
spark-9108eff74a2815986fd067b273c2a344b6315405.zip
[SPARK-10098] [STREAMING] [TEST] Cleanup active context after test in FailureSuite
Failures in streaming.FailureSuite can leak StreamingContext and SparkContext which fails all subsequent tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8289 from tdas/SPARK-10098.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala27
1 files changed, 17 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 0c4c06534a..e82c2fa4e7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -17,25 +17,32 @@
package org.apache.spark.streaming
-import org.apache.spark.Logging
+import java.io.File
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkFunSuite, Logging}
import org.apache.spark.util.Utils
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
-class FailureSuite extends TestSuiteBase with Logging {
+class FailureSuite extends SparkFunSuite with BeforeAndAfter with Logging {
- val directory = Utils.createTempDir()
- val numBatches = 30
+ private val batchDuration: Duration = Milliseconds(1000)
+ private val numBatches = 30
+ private var directory: File = null
- override def batchDuration: Duration = Milliseconds(1000)
-
- override def useManualClock: Boolean = false
+ before {
+ directory = Utils.createTempDir()
+ }
- override def afterFunction() {
- Utils.deleteRecursively(directory)
- super.afterFunction()
+ after {
+ if (directory != null) {
+ Utils.deleteRecursively(directory)
+ }
+ StreamingContext.getActive().foreach { _.stop() }
}
test("multiple failures with map") {