aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-02-11 10:10:36 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-02-11 10:10:36 -0800
commit219a74a7c2d3b858224c4738190ccc92d7cbf06d (patch)
tree82106468772283354dd7c66274bf149208108bea /streaming/src
parent13c17cbb0530d52b9a08d5197017c96501d99e8c (diff)
downloadspark-219a74a7c2d3b858224c4738190ccc92d7cbf06d.tar.gz
spark-219a74a7c2d3b858224c4738190ccc92d7cbf06d.tar.bz2
spark-219a74a7c2d3b858224c4738190ccc92d7cbf06d.zip
[STREAMING][TEST] Fix flaky streaming.FailureSuite
Under some corner cases, the test suite failed to shutdown the SparkContext causing cascaded failures. This fix does two things - Makes sure no SparkContext is active after every test - Makes sure StreamingContext is always shutdown (prevents leaking of StreamingContexts as well, just in case) Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #11166 from tdas/fix-failuresuite.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala3
2 files changed, 6 insertions, 2 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6a0b0a1d47..31e159e968 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark._
import org.apache.spark.util.Utils
/**
@@ -43,6 +43,9 @@ class FailureSuite extends SparkFunSuite with BeforeAndAfter with Logging {
Utils.deleteRecursively(directory)
}
StreamingContext.getActive().foreach { _.stop() }
+
+ // Stop SparkContext if active
+ SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("bla")).stop()
}
test("multiple failures with map") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index a02d49eced..faa9c4f0cb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -242,6 +242,8 @@ object MasterFailureTest extends Logging {
}
} catch {
case e: Exception => logError("Error running streaming context", e)
+ } finally {
+ ssc.stop()
}
if (killingThread.isAlive) {
killingThread.interrupt()
@@ -250,7 +252,6 @@ object MasterFailureTest extends Logging {
// to null after the next test creates the new SparkContext and fail the test.
killingThread.join()
}
- ssc.stop()
logInfo("Has been killed = " + killed)
logInfo("Is last output generated = " + isLastOutputGenerated)