aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py79
1 files changed, 39 insertions, 40 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1d650946be..ae45c99e4f 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod
from pyspark import since
from pyspark.rdd import ignore_unicode_prefix
-__all__ = ["ContinuousQuery"]
+__all__ = ["StreamingQuery"]
-class ContinuousQuery(object):
+class StreamingQuery(object):
"""
A handle to a query that is executing continuously in the background as new data arrives.
All these methods are thread-safe.
@@ -39,30 +39,30 @@ class ContinuousQuery(object):
.. versionadded:: 2.0
"""
- def __init__(self, jcq):
- self._jcq = jcq
+ def __init__(self, jsq):
+ self._jsq = jsq
@property
@since(2.0)
def id(self):
- """The id of the continuous query. This id is unique across all queries that have been
+ """The id of the streaming query. This id is unique across all queries that have been
started in the current process.
"""
- return self._jcq.id()
+ return self._jsq.id()
@property
@since(2.0)
def name(self):
- """The name of the continuous query. This name is unique across all active queries.
+ """The name of the streaming query. This name is unique across all active queries.
"""
- return self._jcq.name()
+ return self._jsq.name()
@property
@since(2.0)
def isActive(self):
- """Whether this continuous query is currently active or not.
+ """Whether this streaming query is currently active or not.
"""
- return self._jcq.isActive()
+ return self._jsq.isActive()
@since(2.0)
def awaitTermination(self, timeout=None):
@@ -75,14 +75,14 @@ class ContinuousQuery(object):
immediately (if the query was terminated by :func:`stop()`), or throw the exception
immediately (if the query has terminated with exception).
- throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jcq.awaitTermination(int(timeout * 1000))
+ return self._jsq.awaitTermination(int(timeout * 1000))
else:
- return self._jcq.awaitTermination()
+ return self._jsq.awaitTermination()
@since(2.0)
def processAllAvailable(self):
@@ -92,26 +92,25 @@ class ContinuousQuery(object):
until data that has been synchronously appended data to a stream source prior to invocation.
(i.e. `getOffset` must immediately reflect the addition).
"""
- return self._jcq.processAllAvailable()
+ return self._jsq.processAllAvailable()
@since(2.0)
def stop(self):
- """Stop this continuous query.
+ """Stop this streaming query.
"""
- self._jcq.stop()
+ self._jsq.stop()
-class ContinuousQueryManager(object):
- """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active
- on a :class:`SQLContext`.
+class StreamingQueryManager(object):
+ """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
.. note:: Experimental
.. versionadded:: 2.0
"""
- def __init__(self, jcqm):
- self._jcqm = jcqm
+ def __init__(self, jsqm):
+ self._jsqm = jsqm
@property
@ignore_unicode_prefix
@@ -119,14 +118,14 @@ class ContinuousQueryManager(object):
def active(self):
"""Returns a list of active queries associated with this SQLContext
- >>> cq = df.writeStream.format('memory').queryName('this_query').start()
- >>> cqm = spark.streams
- >>> # get the list of active continuous queries
- >>> [q.name for q in cqm.active]
+ >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sqm = spark.streams
+ >>> # get the list of active streaming queries
+ >>> [q.name for q in sqm.active]
[u'this_query']
- >>> cq.stop()
+ >>> sq.stop()
"""
- return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
+ return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
@ignore_unicode_prefix
@since(2.0)
@@ -134,20 +133,20 @@ class ContinuousQueryManager(object):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> cq = df.writeStream.format('memory').queryName('this_query').start()
- >>> cq.name
+ >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.name
u'this_query'
- >>> cq = spark.streams.get(cq.id)
- >>> cq.isActive
+ >>> sq = spark.streams.get(sq.id)
+ >>> sq.isActive
True
- >>> cq = sqlContext.streams.get(cq.id)
- >>> cq.isActive
+ >>> sq = sqlContext.streams.get(sq.id)
+ >>> sq.isActive
True
- >>> cq.stop()
+ >>> sq.stop()
"""
if not isinstance(id, intlike):
raise ValueError("The id for the query must be an integer. Got: %s" % id)
- return ContinuousQuery(self._jcqm.get(id))
+ return StreamingQuery(self._jsqm.get(id))
@since(2.0)
def awaitAnyTermination(self, timeout=None):
@@ -168,14 +167,14 @@ class ContinuousQueryManager(object):
queries, users need to stop all of them after any of them terminates with exception, and
then check the `query.exception()` for each query.
- throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jcqm.awaitAnyTermination(int(timeout * 1000))
+ return self._jsqm.awaitAnyTermination(int(timeout * 1000))
else:
- return self._jcqm.awaitAnyTermination()
+ return self._jsqm.awaitAnyTermination()
@since(2.0)
def resetTerminated(self):
@@ -184,11 +183,11 @@ class ContinuousQueryManager(object):
>>> spark.streams.resetTerminated()
"""
- self._jcqm.resetTerminated()
+ self._jsqm.resetTerminated()
class Trigger(object):
- """Used to indicate how often results should be produced by a :class:`ContinuousQuery`.
+ """Used to indicate how often results should be produced by a :class:`StreamingQuery`.
.. note:: Experimental