diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-06-15 10:46:02 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-06-15 10:46:07 -0700 |
commit | 9a5071996b968148f6b9aba12e0d3fe888d9acd8 (patch) | |
tree | 1f39691de83edb31dca56d6cd460261070504248 /python/pyspark/sql/streaming.py | |
parent | d30b7e6696e20f1014c7f26aadbc051da0fac578 (diff) | |
download | spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.tar.gz spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.tar.bz2 spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.zip |
[SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery
Renamed for simplicity, so that its obvious that its related to streaming.
Existing unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #13673 from tdas/SPARK-15953.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r-- | python/pyspark/sql/streaming.py | 79 |
1 files changed, 39 insertions, 40 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1d650946be..ae45c99e4f 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod from pyspark import since from pyspark.rdd import ignore_unicode_prefix -__all__ = ["ContinuousQuery"] +__all__ = ["StreamingQuery"] -class ContinuousQuery(object): +class StreamingQuery(object): """ A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe. @@ -39,30 +39,30 @@ class ContinuousQuery(object): .. versionadded:: 2.0 """ - def __init__(self, jcq): - self._jcq = jcq + def __init__(self, jsq): + self._jsq = jsq @property @since(2.0) def id(self): - """The id of the continuous query. This id is unique across all queries that have been + """The id of the streaming query. This id is unique across all queries that have been started in the current process. """ - return self._jcq.id() + return self._jsq.id() @property @since(2.0) def name(self): - """The name of the continuous query. This name is unique across all active queries. + """The name of the streaming query. This name is unique across all active queries. """ - return self._jcq.name() + return self._jsq.name() @property @since(2.0) def isActive(self): - """Whether this continuous query is currently active or not. + """Whether this streaming query is currently active or not. """ - return self._jcq.isActive() + return self._jsq.isActive() @since(2.0) def awaitTermination(self, timeout=None): @@ -75,14 +75,14 @@ class ContinuousQuery(object): immediately (if the query was terminated by :func:`stop()`), or throw the exception immediately (if the query has terminated with exception). - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcq.awaitTermination(int(timeout * 1000)) + return self._jsq.awaitTermination(int(timeout * 1000)) else: - return self._jcq.awaitTermination() + return self._jsq.awaitTermination() @since(2.0) def processAllAvailable(self): @@ -92,26 +92,25 @@ class ContinuousQuery(object): until data that has been synchronously appended data to a stream source prior to invocation. (i.e. `getOffset` must immediately reflect the addition). """ - return self._jcq.processAllAvailable() + return self._jsq.processAllAvailable() @since(2.0) def stop(self): - """Stop this continuous query. + """Stop this streaming query. """ - self._jcq.stop() + self._jsq.stop() -class ContinuousQueryManager(object): - """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active - on a :class:`SQLContext`. +class StreamingQueryManager(object): + """A class to manage all the :class:`StreamingQuery` StreamingQueries active. .. note:: Experimental .. versionadded:: 2.0 """ - def __init__(self, jcqm): - self._jcqm = jcqm + def __init__(self, jsqm): + self._jsqm = jsqm @property @ignore_unicode_prefix @@ -119,14 +118,14 @@ class ContinuousQueryManager(object): def active(self): """Returns a list of active queries associated with this SQLContext - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cqm = spark.streams - >>> # get the list of active continuous queries - >>> [q.name for q in cqm.active] + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sqm = spark.streams + >>> # get the list of active streaming queries + >>> [q.name for q in sqm.active] [u'this_query'] - >>> cq.stop() + >>> sq.stop() """ - return [ContinuousQuery(jcq) for jcq in self._jcqm.active()] + return [StreamingQuery(jsq) for jsq in self._jsqm.active()] @ignore_unicode_prefix @since(2.0) @@ -134,20 +133,20 @@ class ContinuousQueryManager(object): """Returns an active query from this SQLContext or throws exception if an active query with this name doesn't exist. - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cq.name + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sq.name u'this_query' - >>> cq = spark.streams.get(cq.id) - >>> cq.isActive + >>> sq = spark.streams.get(sq.id) + >>> sq.isActive True - >>> cq = sqlContext.streams.get(cq.id) - >>> cq.isActive + >>> sq = sqlContext.streams.get(sq.id) + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ if not isinstance(id, intlike): raise ValueError("The id for the query must be an integer. Got: %s" % id) - return ContinuousQuery(self._jcqm.get(id)) + return StreamingQuery(self._jsqm.get(id)) @since(2.0) def awaitAnyTermination(self, timeout=None): @@ -168,14 +167,14 @@ class ContinuousQueryManager(object): queries, users need to stop all of them after any of them terminates with exception, and then check the `query.exception()` for each query. - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcqm.awaitAnyTermination(int(timeout * 1000)) + return self._jsqm.awaitAnyTermination(int(timeout * 1000)) else: - return self._jcqm.awaitAnyTermination() + return self._jsqm.awaitAnyTermination() @since(2.0) def resetTerminated(self): @@ -184,11 +183,11 @@ class ContinuousQueryManager(object): >>> spark.streams.resetTerminated() """ - self._jcqm.resetTerminated() + self._jsqm.resetTerminated() class Trigger(object): - """Used to indicate how often results should be produced by a :class:`ContinuousQuery`. + """Used to indicate how often results should be produced by a :class:`StreamingQuery`. .. note:: Experimental |