aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 16:44:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 16:44:07 -0800
commit7883b8f5798e3de6f55a1182a5d5775c4aaa783b (patch)
tree8ca20c15e797c9b81778da986ab022dc9af2d30c /streaming/src/test
parentc5921e5c6184ddc99c12c0b1f2646b6bd74a9e98 (diff)
downloadspark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.tar.gz
spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.tar.bz2
spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.zip
Fixed bugs to ensure better cleanup of JobScheduler, JobGenerator and NetworkInputTracker upon close.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala5
2 files changed, 13 insertions, 2 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9eb9b3684c..10c18a7f7e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc.stop()
ssc = null
}
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
}
test("from no conf constructor") {
@@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register
+ ssc.start()
ssc.stop()
ssc.stop()
ssc = null
@@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
+ sc = ssc.sparkContext
+ addInputStream(ssc).register
+ ssc.start()
ssc.stop(false)
ssc = null
assert(sc.makeRDD(1 to 100).collect().size === 100)
+ ssc = new StreamingContext(sc, batchDuration)
}
test("waitForStop") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 3569624d51..a8ff444109 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- Thread.sleep(10)
+ ssc.waitForStop(50)
}
val timeTaken = System.currentTimeMillis() - startTime
-
+ logInfo("Output generated in " + timeTaken + " milliseconds")
+ output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")