aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py61
1 files changed, 41 insertions, 20 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 580aba651f..bb4e62cdd6 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -15,6 +15,12 @@
# limitations under the License.
#
+import sys
+if sys.version >= '3':
+ intlike = int
+else:
+ intlike = (int, long)
+
from abc import ABCMeta, abstractmethod
from pyspark import since
@@ -38,8 +44,16 @@ class ContinuousQuery(object):
@property
@since(2.0)
+ def id(self):
+ """The id of the continuous query. This id is unique across all queries that have been
+ started in the current process.
+ """
+ return self._jcq.id()
+
+ @property
+ @since(2.0)
def name(self):
- """The name of the continuous query.
+ """The name of the continuous query. This name is unique across all active queries.
"""
return self._jcq.name()
@@ -106,7 +120,7 @@ class ContinuousQueryManager(object):
"""Returns a list of active queries associated with this SQLContext
>>> cq = df.write.format('memory').queryName('this_query').startStream()
- >>> cqm = sqlContext.streams
+ >>> cqm = spark.streams
>>> # get the list of active continuous queries
>>> [q.name for q in cqm.active]
[u'this_query']
@@ -114,20 +128,26 @@ class ContinuousQueryManager(object):
"""
return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
+ @ignore_unicode_prefix
@since(2.0)
- def get(self, name):
+ def get(self, id):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> df.write.format('memory').queryName('this_query').startStream()
- >>> cq = sqlContext.streams.get('this_query')
+ >>> cq = df.write.format('memory').queryName('this_query').startStream()
+ >>> cq.name
+ u'this_query'
+ >>> cq = spark.streams.get(cq.id)
+ >>> cq.isActive
+ True
+ >>> cq = sqlContext.streams.get(cq.id)
>>> cq.isActive
True
>>> cq.stop()
"""
- if type(name) != str or len(name.strip()) == 0:
- raise ValueError("The name for the query must be a non-empty string. Got: %s" % name)
- return ContinuousQuery(self._jcqm.get(name))
+ if not isinstance(id, intlike):
+ raise ValueError("The id for the query must be an integer. Got: %d" % id)
+ return ContinuousQuery(self._jcqm.get(id))
@since(2.0)
def awaitAnyTermination(self, timeout=None):
@@ -162,7 +182,7 @@ class ContinuousQueryManager(object):
"""Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
again to wait for new terminations.
- >>> sqlContext.streams.resetTerminated()
+ >>> spark.streams.resetTerminated()
"""
self._jcqm.resetTerminated()
@@ -209,27 +229,28 @@ def _test():
import doctest
import os
import tempfile
- from pyspark.context import SparkContext
- from pyspark.sql import Row, SQLContext, HiveContext
- import pyspark.sql.readwriter
+ from pyspark.sql import Row, SparkSession, SQLContext
+ import pyspark.sql.streaming
os.chdir(os.environ["SPARK_HOME"])
- globs = pyspark.sql.readwriter.__dict__.copy()
- sc = SparkContext('local[4]', 'PythonTest')
+ globs = pyspark.sql.streaming.__dict__.copy()
+ try:
+ spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+ except py4j.protocol.Py4JError:
+ spark = SparkSession(sc)
globs['tempfile'] = tempfile
globs['os'] = os
- globs['sc'] = sc
- globs['sqlContext'] = SQLContext(sc)
- globs['hiveContext'] = HiveContext._createForTesting(sc)
+ globs['spark'] = spark
+ globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
globs['df'] = \
- globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
+ globs['spark'].read.format('text').stream('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.readwriter, globs=globs,
+ pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
- globs['sc'].stop()
+ globs['spark'].stop()
if failure_count:
exit(-1)