aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala21
2 files changed, 23 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b20dbdd8cc..7b2a7d5211 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -432,7 +432,7 @@ class StreamingContext private[streaming] (
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown here.
+ * will be thrown in this thread.
*/
def waitForStop() {
waiter.waitForStopOrError()
@@ -440,7 +440,7 @@ class StreamingContext private[streaming] (
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown here.
+ * will be thrown in this thread.
* @param timeout time to wait
*/
def waitForStop(timeout: Long) {
@@ -449,6 +449,7 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams.
+ * @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean = true) = synchronized {
scheduler.stop()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 523173d45a..ea7f7da6f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -483,9 +483,28 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def start() = ssc.start()
/**
- * Stop the execution of the streams.
+ * Wait for the execution to stop. Any exceptions that occurs during the execution
+ * will be thrown in this thread.
+ */
+ def waitForStop() = ssc.waitForStop()
+
+ /**
+ * Wait for the execution to stop. Any exceptions that occurs during the execution
+ * will be thrown in this thread.
+ * @param timeout time to wait
+ */
+ def waitForStop(timeout: Long) = ssc.waitForStop(timeout)
+
+ /**
+ * Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
*/
def stop() = ssc.stop()
+
+ /**
+ * Stop the execution of the streams.
+ * @param stopSparkContext Stop the associated SparkContext or not
+ */
+ def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
}
/**