diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-18 17:00:13 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-18 17:00:13 -0700 |
commit | 9108eff74a2815986fd067b273c2a344b6315405 (patch) | |
tree | c3edec4f0880407b53d025a6d8052ab9bdc3c9f3 | |
parent | c635a16f64c939182196b46725ef2d00ed107cca (diff) | |
download | spark-9108eff74a2815986fd067b273c2a344b6315405.tar.gz spark-9108eff74a2815986fd067b273c2a344b6315405.tar.bz2 spark-9108eff74a2815986fd067b273c2a344b6315405.zip |
[SPARK-10098] [STREAMING] [TEST] Cleanup active context after test in FailureSuite
Failures in streaming.FailureSuite can leak StreamingContext and SparkContext which fails all subsequent tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #8289 from tdas/SPARK-10098.
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index 0c4c06534a..e82c2fa4e7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -17,25 +17,32 @@ package org.apache.spark.streaming -import org.apache.spark.Logging +import java.io.File + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkFunSuite, Logging} import org.apache.spark.util.Utils /** * This testsuite tests master failures at random times while the stream is running using * the real clock. */ -class FailureSuite extends TestSuiteBase with Logging { +class FailureSuite extends SparkFunSuite with BeforeAndAfter with Logging { - val directory = Utils.createTempDir() - val numBatches = 30 + private val batchDuration: Duration = Milliseconds(1000) + private val numBatches = 30 + private var directory: File = null - override def batchDuration: Duration = Milliseconds(1000) - - override def useManualClock: Boolean = false + before { + directory = Utils.createTempDir() + } - override def afterFunction() { - Utils.deleteRecursively(directory) - super.afterFunction() + after { + if (directory != null) { + Utils.deleteRecursively(directory) + } + StreamingContext.getActive().foreach { _.stop() } } test("multiple failures with map") { |