From 7883b8f5798e3de6f55a1182a5d5775c4aaa783b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 16:44:07 -0800 Subject: Fixed bugs to ensure better cleanup of JobScheduler, JobGenerator and NetworkInputTracker upon close. --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 10 ++++++++++ .../test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 5 +++-- 2 files changed, 13 insertions(+), 2 deletions(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9eb9b3684c..10c18a7f7e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.stop() ssc = null } + if (sc != null) { + sc.stop() + sc = null + } } test("from no conf constructor") { @@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop multiple times") { ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register + ssc.start() ssc.stop() ssc.stop() ssc = null @@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) + sc = ssc.sparkContext + addInputStream(ssc).register + ssc.start() ssc.stop(false) ssc = null assert(sc.makeRDD(1 to 100).collect().size === 100) + ssc = new StreamingContext(sc, batchDuration) } test("waitForStop") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 3569624d51..a8ff444109 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(10) + ssc.waitForStop(50) } val timeTaken = System.currentTimeMillis() - startTime - + logInfo("Output generated in " + timeTaken + " milliseconds") + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") -- cgit v1.2.3