From 4d9b0ab420df383869fa586b229ac00f234b8749 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 11 Jan 2014 23:35:51 -0800 Subject: Added waitForStop and stop to JavaStreamingContext. --- .../apache/spark/streaming/StreamingContext.scala | 5 +++-- .../streaming/api/java/JavaStreamingContext.scala | 21 ++++++++++++++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b20dbdd8cc..7b2a7d5211 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -432,7 +432,7 @@ class StreamingContext private[streaming] ( /** * Wait for the execution to stop. Any exceptions that occurs during the execution - * will be thrown here. + * will be thrown in this thread. */ def waitForStop() { waiter.waitForStopOrError() @@ -440,7 +440,7 @@ class StreamingContext private[streaming] ( /** * Wait for the execution to stop. Any exceptions that occurs during the execution - * will be thrown here. + * will be thrown in this thread. * @param timeout time to wait */ def waitForStop(timeout: Long) { @@ -449,6 +449,7 @@ class StreamingContext private[streaming] ( /** * Stop the execution of the streams. + * @param stopSparkContext Stop the associated SparkContext or not */ def stop(stopSparkContext: Boolean = true) = synchronized { scheduler.stop() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 523173d45a..ea7f7da6f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -483,9 +483,28 @@ class JavaStreamingContext(val ssc: StreamingContext) { def start() = ssc.start() /** - * Stop the execution of the streams. + * Wait for the execution to stop. Any exceptions that occurs during the execution + * will be thrown in this thread. + */ + def waitForStop() = ssc.waitForStop() + + /** + * Wait for the execution to stop. Any exceptions that occurs during the execution + * will be thrown in this thread. + * @param timeout time to wait + */ + def waitForStop(timeout: Long) = ssc.waitForStop(timeout) + + /** + * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. */ def stop() = ssc.stop() + + /** + * Stop the execution of the streams. + * @param stopSparkContext Stop the associated SparkContext or not + */ + def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext) } /** -- cgit v1.2.3