diff options
author | zsxwing <zsxwing@gmail.com> | 2015-02-04 00:40:28 -0800 |
---|---|---|
committer | Tathagata Das <tdas@databricks.com> | 2015-02-04 00:40:51 -0800 |
commit | 4d3dbfda3a4414cc18a0d43619391f7b75d48eaf (patch) | |
tree | 4a2d91a906050ed019b83ab6b3cc0cce058e97d6 /python | |
parent | 3b7acd22ab4a134c74746e3b9a803dbd34d43855 (diff) | |
download | spark-4d3dbfda3a4414cc18a0d43619391f7b75d48eaf.tar.gz spark-4d3dbfda3a4414cc18a0d43619391f7b75d48eaf.tar.bz2 spark-4d3dbfda3a4414cc18a0d43619391f7b75d48eaf.zip |
[SPARK-5379][Streaming] Add awaitTerminationOrTimeout
Added `awaitTerminationOrTimeout` to return if the waiting time elapsed:
* `true` if it's stopped.
* `false` if the waiting time elapsed before returning from the method.
* throw the reported error if it's thrown during the execution.
Also deprecated `awaitTermination(timeout: Long)`.
Author: zsxwing <zsxwing@gmail.com>
Closes #4171 from zsxwing/SPARK-5379 and squashes the following commits:
c9e660b [zsxwing] Add a unit test for awaitTerminationOrTimeout
8a89f92 [zsxwing] Add awaitTerminationOrTimeout to python
cdc820b [zsxwing] Add awaitTerminationOrTimeout
(cherry picked from commit 4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f)
Signed-off-by: Tathagata Das <tdas@databricks.com>
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/streaming/context.py | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 18aaae93b0..b06ab65037 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -191,6 +191,15 @@ class StreamingContext(object): else: self._jssc.awaitTermination(int(timeout * 1000)) + def awaitTerminationOrTimeout(self, timeout): + """ + Wait for the execution to stop. Return `true` if it's stopped; or + throw the reported error during the execution; or `false` if the + waiting time elapsed before returning from the method. + @param timeout: time to wait in seconds + """ + self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) + def stop(self, stopSparkContext=True, stopGraceFully=False): """ Stop the execution of the streams, with option of ensuring all |