aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-02-04 00:40:28 -0800
committerTathagata Das <tdas@databricks.com>2015-02-04 00:40:28 -0800
commit4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f (patch)
tree4a2d91a906050ed019b83ab6b3cc0cce058e97d6 /streaming
parent6aed719e503afa48820f6e3e798da483649dfcb9 (diff)
downloadspark-4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f.tar.gz
spark-4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f.tar.bz2
spark-4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f.zip
[SPARK-5379][Streaming] Add awaitTerminationOrTimeout
Added `awaitTerminationOrTimeout` to return if the waiting time elapsed: * `true` if it's stopped. * `false` if the waiting time elapsed before returning from the method. * throw the reported error if it's thrown during the execution. Also deprecated `awaitTermination(timeout: Long)`. Author: zsxwing <zsxwing@gmail.com> Closes #4171 from zsxwing/SPARK-5379 and squashes the following commits: c9e660b [zsxwing] Add a unit test for awaitTerminationOrTimeout 8a89f92 [zsxwing] Add awaitTerminationOrTimeout to python cdc820b [zsxwing] Add awaitTerminationOrTimeout
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala24
3 files changed, 50 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ddc435cf1a..ba3f23434f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -526,11 +526,24 @@ class StreamingContext private[streaming] (
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
+ @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
def awaitTermination(timeout: Long) {
waiter.waitForStopOrError(timeout)
}
/**
+ * Wait for the execution to stop. Any exceptions that occurs during the execution
+ * will be thrown in this thread.
+ *
+ * @param timeout time to wait in milliseconds
+ * @return `true` if it's stopped; or throw the reported error during the execution; or `false`
+ * if the waiting time elapsed before returning from the method.
+ */
+ def awaitTerminationOrTimeout(timeout: Long): Boolean = {
+ waiter.waitForStopOrError(timeout)
+ }
+
+ /**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 0f7ae7a1c7..e3db01c1e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -597,11 +597,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
+ @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
def awaitTermination(timeout: Long): Unit = {
ssc.awaitTermination(timeout)
}
/**
+ * Wait for the execution to stop. Any exceptions that occurs during the execution
+ * will be thrown in this thread.
+ *
+ * @param timeout time to wait in milliseconds
+ * @return `true` if it's stopped; or throw the reported error during the execution; or `false`
+ * if the waiting time elapsed before returning from the method.
+ */
+ def awaitTerminationOrTimeout(timeout: Long): Boolean = {
+ ssc.awaitTerminationOrTimeout(timeout)
+ }
+
+ /**
* Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
*/
def stop(): Unit = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 0b5af25e0f..2aa5e0876b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -304,6 +304,30 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
}
+ test("awaitTerminationOrTimeout") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val inputStream = addInputStream(ssc)
+ inputStream.map(x => x).register()
+
+ ssc.start()
+
+ // test whether awaitTerminationOrTimeout() return false after give amount of time
+ failAfter(1000 millis) {
+ assert(ssc.awaitTerminationOrTimeout(500) === false)
+ }
+
+ // test whether awaitTerminationOrTimeout() return true if context is stopped
+ failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
+ new Thread() {
+ override def run() {
+ Thread.sleep(500)
+ ssc.stop()
+ }
+ }.start()
+ assert(ssc.awaitTerminationOrTimeout(10000) === true)
+ }
+ }
+
test("DStream and generated RDD creation sites") {
testPackage.test()
}