diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-12 17:36:49 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-12 17:36:49 -0800 |
commit | d1820fef574e8f559d8fba3995e21216033be303 (patch) | |
tree | baeab5a0e06cce68da9927940401d642e6013115 /streaming/src/test | |
parent | 448aef6790caa3728bcc43f518afb69807597c39 (diff) | |
parent | c7fabb745b26b42bb4a4609edcb43019fcbd68aa (diff) | |
download | spark-d1820fef574e8f559d8fba3995e21216033be303.tar.gz spark-d1820fef574e8f559d8fba3995e21216033be303.tar.bz2 spark-d1820fef574e8f559d8fba3995e21216033be303.zip |
Merge branch 'error-handling' into dstream-move
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 26 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 5 |
2 files changed, 21 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index e0232c70a8..a4d0f9f978 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.stop() ssc = null } + if (sc != null) { + sc.stop() + sc = null + } } test("from no conf constructor") { @@ -125,6 +129,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop multiple times") { ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register + ssc.start() ssc.stop() ssc.stop() ssc = null @@ -132,12 +138,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) + sc = ssc.sparkContext + addInputStream(ssc).register + ssc.start() ssc.stop(false) ssc = null assert(sc.makeRDD(1 to 100).collect().size === 100) + ssc = new StreamingContext(sc, batchDuration) } - test("waitForStop") { + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => x).register @@ -149,13 +159,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { // test whether waitForStop() exits after give amount of time failAfter(1000 millis) { - ssc.waitForStop(500) + ssc.awaitTermination(500) } // test whether waitForStop() does not exit if not time is given val exception = intercept[Exception] { failAfter(1000 millis) { - ssc.waitForStop() + ssc.awaitTermination() throw new Exception("Did not wait for stop") } } @@ -169,11 +179,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.stop() } }.start() - ssc.waitForStop() + ssc.awaitTermination() } } - test("waitForStop with error in task") { + test("awaitTermination with error in task") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => { throw new TestException("error in map task"); x}) @@ -181,19 +191,19 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { val exception = intercept[Exception] { ssc.start() - ssc.waitForStop(5000) + ssc.awaitTermination(5000) } assert(exception.getMessage.contains("map task"), "Expected exception not thrown") } - test("waitForStop with error in job generation") { + test("awaitTermination with error in job generation") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register val exception = intercept[TestException] { ssc.start() - ssc.waitForStop(5000) + ssc.awaitTermination(5000) } assert(exception.getMessage.contains("transform"), "Expected exception not thrown") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 75093d6106..d56dd1abe3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(10) + ssc.awaitTermination(50) } val timeTaken = System.currentTimeMillis() - startTime - + logInfo("Output generated in " + timeTaken + " milliseconds") + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") |