aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:35:51 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:35:51 -0800
commit4d9b0ab420df383869fa586b229ac00f234b8749 (patch)
tree42e1a109e82a50478a063a6b6941c7f716babea3 /streaming/src
parentf5108ffc24eccd21f5d6dc4114ea47b0ab14ab14 (diff)
downloadspark-4d9b0ab420df383869fa586b229ac00f234b8749.tar.gz
spark-4d9b0ab420df383869fa586b229ac00f234b8749.tar.bz2
spark-4d9b0ab420df383869fa586b229ac00f234b8749.zip
Added waitForStop and stop to JavaStreamingContext.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala21
2 files changed, 23 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b20dbdd8cc..7b2a7d5211 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -432,7 +432,7 @@ class StreamingContext private[streaming] (
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown here.
+ * will be thrown in this thread.
*/
def waitForStop() {
waiter.waitForStopOrError()
@@ -440,7 +440,7 @@ class StreamingContext private[streaming] (
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown here.
+ * will be thrown in this thread.
* @param timeout time to wait
*/
def waitForStop(timeout: Long) {
@@ -449,6 +449,7 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams.
+ * @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean = true) = synchronized {
scheduler.stop()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 523173d45a..ea7f7da6f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -483,9 +483,28 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def start() = ssc.start()
/**
- * Stop the execution of the streams.
+ * Wait for the execution to stop. Any exceptions that occurs during the execution
+ * will be thrown in this thread.
+ */
+ def waitForStop() = ssc.waitForStop()
+
+ /**
+ * Wait for the execution to stop. Any exceptions that occurs during the execution
+ * will be thrown in this thread.
+ * @param timeout time to wait
+ */
+ def waitForStop(timeout: Long) = ssc.waitForStop(timeout)
+
+ /**
+ * Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
*/
def stop() = ssc.stop()
+
+ /**
+ * Stop the execution of the streams.
+ * @param stopSparkContext Stop the associated SparkContext or not
+ */
+ def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
}
/**