From c7fabb745b26b42bb4a4609edcb43019fcbd68aa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 17:21:13 -0800 Subject: Changed StreamingContext.stopForWait to awaitTermination. --- .../org/apache/spark/streaming/StreamingContext.scala | 6 +++--- .../spark/streaming/api/java/JavaStreamingContext.scala | 6 +++--- .../apache/spark/streaming/StreamingContextSuite.scala | 16 ++++++++-------- .../scala/org/apache/spark/streaming/TestSuiteBase.scala | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7b2a7d5211..ee83ae902b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -434,16 +434,16 @@ class StreamingContext private[streaming] ( * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */ - def waitForStop() { + def awaitTermination() { waiter.waitForStopOrError() } /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. - * @param timeout time to wait + * @param timeout time to wait in milliseconds */ - def waitForStop(timeout: Long) { + def awaitTermination(timeout: Long) { waiter.waitForStopOrError(timeout) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index ea7f7da6f3..b4c46f5e50 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -486,14 +486,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */ - def waitForStop() = ssc.waitForStop() + def awaitTermination() = ssc.awaitTermination() /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. - * @param timeout time to wait + * @param timeout time to wait in milliseconds */ - def waitForStop(timeout: Long) = ssc.waitForStop(timeout) + def awaitTermination(timeout: Long) = ssc.awaitTermination(timeout) /** * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 10c18a7f7e..a477d200c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -146,7 +146,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc = new StreamingContext(sc, batchDuration) } - test("waitForStop") { + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => x).register @@ -158,13 +158,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { // test whether waitForStop() exits after give amount of time failAfter(1000 millis) { - ssc.waitForStop(500) + ssc.awaitTermination(500) } // test whether waitForStop() does not exit if not time is given val exception = intercept[Exception] { failAfter(1000 millis) { - ssc.waitForStop() + ssc.awaitTermination() throw new Exception("Did not wait for stop") } } @@ -178,11 +178,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.stop() } }.start() - ssc.waitForStop() + ssc.awaitTermination() } } - test("waitForStop with error in task") { + test("awaitTermination with error in task") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => { throw new TestException("error in map task"); x}) @@ -190,19 +190,19 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { val exception = intercept[Exception] { ssc.start() - ssc.waitForStop(5000) + ssc.awaitTermination(5000) } assert(exception.getMessage.contains("map task"), "Expected exception not thrown") } - test("waitForStop with error in job generation") { + test("awaitTermination with error in job generation") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register val exception = intercept[TestException] { ssc.start() - ssc.waitForStop(5000) + ssc.awaitTermination(5000) } assert(exception.getMessage.contains("transform"), "Expected exception not thrown") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index a8ff444109..63a07cfbdf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -273,7 +273,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - ssc.waitForStop(50) + ssc.awaitTermination(50) } val timeTaken = System.currentTimeMillis() - startTime logInfo("Output generated in " + timeTaken + " milliseconds") -- cgit v1.2.3