aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/context.py2
-rw-r--r--python/pyspark/sql/session.py11
-rw-r--r--python/pyspark/sql/streaming.py61
3 files changed, 54 insertions, 20 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 486733a390..60f62b219b 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -433,6 +433,8 @@ class SQLContext(object):
def streams(self):
"""Returns a :class:`ContinuousQueryManager` that allows managing all the
:class:`ContinuousQuery` ContinuousQueries active on `this` context.
+
+ .. note:: Experimental.
"""
from pyspark.sql.streaming import ContinuousQueryManager
return ContinuousQueryManager(self._ssql_ctx.streams())
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 7c9f532f94..f0bf0923b8 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -549,6 +549,17 @@ class SparkSession(object):
"""
return DataFrameReader(self._wrapped)
+ @property
+ @since(2.0)
+ def streams(self):
+ """Returns a :class:`ContinuousQueryManager` that allows managing all the
+ :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+
+ .. note:: Experimental.
+ """
+ from pyspark.sql.streaming import ContinuousQueryManager
+ return ContinuousQueryManager(self._jsparkSession.streams())
+
@since(2.0)
def stop(self):
"""Stop the underlying :class:`SparkContext`.
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 580aba651f..bb4e62cdd6 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -15,6 +15,12 @@
# limitations under the License.
#
+import sys
+if sys.version >= '3':
+ intlike = int
+else:
+ intlike = (int, long)
+
from abc import ABCMeta, abstractmethod
from pyspark import since
@@ -38,8 +44,16 @@ class ContinuousQuery(object):
@property
@since(2.0)
+ def id(self):
+ """The id of the continuous query. This id is unique across all queries that have been
+ started in the current process.
+ """
+ return self._jcq.id()
+
+ @property
+ @since(2.0)
def name(self):
- """The name of the continuous query.
+ """The name of the continuous query. This name is unique across all active queries.
"""
return self._jcq.name()
@@ -106,7 +120,7 @@ class ContinuousQueryManager(object):
"""Returns a list of active queries associated with this SQLContext
>>> cq = df.write.format('memory').queryName('this_query').startStream()
- >>> cqm = sqlContext.streams
+ >>> cqm = spark.streams
>>> # get the list of active continuous queries
>>> [q.name for q in cqm.active]
[u'this_query']
@@ -114,20 +128,26 @@ class ContinuousQueryManager(object):
"""
return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
+ @ignore_unicode_prefix
@since(2.0)
- def get(self, name):
+ def get(self, id):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> df.write.format('memory').queryName('this_query').startStream()
- >>> cq = sqlContext.streams.get('this_query')
+ >>> cq = df.write.format('memory').queryName('this_query').startStream()
+ >>> cq.name
+ u'this_query'
+ >>> cq = spark.streams.get(cq.id)
+ >>> cq.isActive
+ True
+ >>> cq = sqlContext.streams.get(cq.id)
>>> cq.isActive
True
>>> cq.stop()
"""
- if type(name) != str or len(name.strip()) == 0:
- raise ValueError("The name for the query must be a non-empty string. Got: %s" % name)
- return ContinuousQuery(self._jcqm.get(name))
+ if not isinstance(id, intlike):
+ raise ValueError("The id for the query must be an integer. Got: %d" % id)
+ return ContinuousQuery(self._jcqm.get(id))
@since(2.0)
def awaitAnyTermination(self, timeout=None):
@@ -162,7 +182,7 @@ class ContinuousQueryManager(object):
"""Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
again to wait for new terminations.
- >>> sqlContext.streams.resetTerminated()
+ >>> spark.streams.resetTerminated()
"""
self._jcqm.resetTerminated()
@@ -209,27 +229,28 @@ def _test():
import doctest
import os
import tempfile
- from pyspark.context import SparkContext
- from pyspark.sql import Row, SQLContext, HiveContext
- import pyspark.sql.readwriter
+ from pyspark.sql import Row, SparkSession, SQLContext
+ import pyspark.sql.streaming
os.chdir(os.environ["SPARK_HOME"])
- globs = pyspark.sql.readwriter.__dict__.copy()
- sc = SparkContext('local[4]', 'PythonTest')
+ globs = pyspark.sql.streaming.__dict__.copy()
+ try:
+ spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+ except py4j.protocol.Py4JError:
+ spark = SparkSession(sc)
globs['tempfile'] = tempfile
globs['os'] = os
- globs['sc'] = sc
- globs['sqlContext'] = SQLContext(sc)
- globs['hiveContext'] = HiveContext._createForTesting(sc)
+ globs['spark'] = spark
+ globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
globs['df'] = \
- globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
+ globs['spark'].read.format('text').stream('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.readwriter, globs=globs,
+ pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
- globs['sc'].stop()
+ globs['spark'].stop()
if failure_count:
exit(-1)