diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-12 16:44:07 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-12 16:44:07 -0800 |
commit | 7883b8f5798e3de6f55a1182a5d5775c4aaa783b (patch) | |
tree | 8ca20c15e797c9b81778da986ab022dc9af2d30c /streaming/src/test | |
parent | c5921e5c6184ddc99c12c0b1f2646b6bd74a9e98 (diff) | |
download | spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.tar.gz spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.tar.bz2 spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.zip |
Fixed bugs to ensure better cleanup of JobScheduler, JobGenerator and NetworkInputTracker upon close.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 10 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 5 |
2 files changed, 13 insertions, 2 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9eb9b3684c..10c18a7f7e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.stop() ssc = null } + if (sc != null) { + sc.stop() + sc = null + } } test("from no conf constructor") { @@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop multiple times") { ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register + ssc.start() ssc.stop() ssc.stop() ssc = null @@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) + sc = ssc.sparkContext + addInputStream(ssc).register + ssc.start() ssc.stop(false) ssc = null assert(sc.makeRDD(1 to 100).collect().size === 100) + ssc = new StreamingContext(sc, batchDuration) } test("waitForStop") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 3569624d51..a8ff444109 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(10) + ssc.waitForStop(50) } val timeTaken = System.currentTimeMillis() - startTime - + logInfo("Output generated in " + timeTaken + " milliseconds") + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") |