aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-15 10:46:02 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-15 10:46:07 -0700
commit9a5071996b968148f6b9aba12e0d3fe888d9acd8 (patch)
tree1f39691de83edb31dca56d6cd460261070504248 /python
parentd30b7e6696e20f1014c7f26aadbc051da0fac578 (diff)
downloadspark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.tar.gz
spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.tar.bz2
spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.zip
[SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery
Renamed for simplicity, so that its obvious that its related to streaming. Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13673 from tdas/SPARK-15953.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/context.py8
-rw-r--r--python/pyspark/sql/dataframe.py2
-rw-r--r--python/pyspark/sql/readwriter.py40
-rw-r--r--python/pyspark/sql/session.py10
-rw-r--r--python/pyspark/sql/streaming.py79
-rw-r--r--python/pyspark/sql/tests.py52
-rw-r--r--python/pyspark/sql/utils.py8
7 files changed, 99 insertions, 100 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index a271afe4cf..8a1a874884 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -444,13 +444,13 @@ class SQLContext(object):
@property
@since(2.0)
def streams(self):
- """Returns a :class:`ContinuousQueryManager` that allows managing all the
- :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+ """Returns a :class:`StreamingQueryManager` that allows managing all the
+ :class:`StreamingQuery` StreamingQueries active on `this` context.
.. note:: Experimental.
"""
- from pyspark.sql.streaming import ContinuousQueryManager
- return ContinuousQueryManager(self._ssql_ctx.streams())
+ from pyspark.sql.streaming import StreamingQueryManager
+ return StreamingQueryManager(self._ssql_ctx.streams())
class HiveContext(SQLContext):
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0126faf574..acf9d08b23 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -257,7 +257,7 @@ class DataFrame(object):
def isStreaming(self):
"""Returns true if this :class:`Dataset` contains one or more sources that continuously
return data as it arrives. A :class:`Dataset` that reads data from a streaming source
- must be executed as a :class:`ContinuousQuery` using the :func:`startStream` method in
+ must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in
:class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or
:func:`collect`) will throw an :class:`AnalysisException` when there is a streaming
source present.
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index ad954d0ad8..c982de6840 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
from pyspark.sql import utils
-__all__ = ["DataFrameReader", "DataFrameWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"]
def to_str(value):
@@ -458,9 +458,9 @@ class DataFrameWriter(object):
self._spark = df.sql_ctx
self._jwrite = df._jdf.write()
- def _cq(self, jcq):
- from pyspark.sql.streaming import ContinuousQuery
- return ContinuousQuery(jcq)
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
@since(1.4)
def mode(self, saveMode):
@@ -1094,9 +1094,9 @@ class DataStreamWriter(object):
self._spark = df.sql_ctx
self._jwrite = df._jdf.writeStream()
- def _cq(self, jcq):
- from pyspark.sql.streaming import ContinuousQuery
- return ContinuousQuery(jcq)
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
@since(2.0)
def outputMode(self, outputMode):
@@ -1169,8 +1169,8 @@ class DataStreamWriter(object):
@since(2.0)
def queryName(self, queryName):
- """Specifies the name of the :class:`ContinuousQuery` that can be started with
- :func:`startStream`. This name must be unique among all the currently active queries
+ """Specifies the name of the :class:`StreamingQuery` that can be started with
+ :func:`start`. This name must be unique among all the currently active queries
in the associated SparkSession.
.. note:: Experimental.
@@ -1232,21 +1232,21 @@ class DataStreamWriter(object):
:param options: All other string options. You may want to provide a `checkpointLocation`
for most streams, however it is not required for a `memory` stream.
- >>> cq = sdf.writeStream.format('memory').queryName('this_query').start()
- >>> cq.isActive
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.isActive
True
- >>> cq.name
+ >>> sq.name
u'this_query'
- >>> cq.stop()
- >>> cq.isActive
+ >>> sq.stop()
+ >>> sq.isActive
False
- >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+ >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
... queryName='that_query', format='memory')
- >>> cq.name
+ >>> sq.name
u'that_query'
- >>> cq.isActive
+ >>> sq.isActive
True
- >>> cq.stop()
+ >>> sq.stop()
"""
self.options(**options)
if partitionBy is not None:
@@ -1256,9 +1256,9 @@ class DataStreamWriter(object):
if queryName is not None:
self.queryName(queryName)
if path is None:
- return self._cq(self._jwrite.start())
+ return self._sq(self._jwrite.start())
else:
- return self._cq(self._jwrite.start(path))
+ return self._sq(self._jwrite.start(path))
def _test():
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 11c815dd94..6edbd59856 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -565,15 +565,15 @@ class SparkSession(object):
@property
@since(2.0)
def streams(self):
- """Returns a :class:`ContinuousQueryManager` that allows managing all the
- :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+ """Returns a :class:`StreamingQueryManager` that allows managing all the
+ :class:`StreamingQuery` StreamingQueries active on `this` context.
.. note:: Experimental.
- :return: :class:`ContinuousQueryManager`
+ :return: :class:`StreamingQueryManager`
"""
- from pyspark.sql.streaming import ContinuousQueryManager
- return ContinuousQueryManager(self._jsparkSession.streams())
+ from pyspark.sql.streaming import StreamingQueryManager
+ return StreamingQueryManager(self._jsparkSession.streams())
@since(2.0)
def stop(self):
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1d650946be..ae45c99e4f 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod
from pyspark import since
from pyspark.rdd import ignore_unicode_prefix
-__all__ = ["ContinuousQuery"]
+__all__ = ["StreamingQuery"]
-class ContinuousQuery(object):
+class StreamingQuery(object):
"""
A handle to a query that is executing continuously in the background as new data arrives.
All these methods are thread-safe.
@@ -39,30 +39,30 @@ class ContinuousQuery(object):
.. versionadded:: 2.0
"""
- def __init__(self, jcq):
- self._jcq = jcq
+ def __init__(self, jsq):
+ self._jsq = jsq
@property
@since(2.0)
def id(self):
- """The id of the continuous query. This id is unique across all queries that have been
+ """The id of the streaming query. This id is unique across all queries that have been
started in the current process.
"""
- return self._jcq.id()
+ return self._jsq.id()
@property
@since(2.0)
def name(self):
- """The name of the continuous query. This name is unique across all active queries.
+ """The name of the streaming query. This name is unique across all active queries.
"""
- return self._jcq.name()
+ return self._jsq.name()
@property
@since(2.0)
def isActive(self):
- """Whether this continuous query is currently active or not.
+ """Whether this streaming query is currently active or not.
"""
- return self._jcq.isActive()
+ return self._jsq.isActive()
@since(2.0)
def awaitTermination(self, timeout=None):
@@ -75,14 +75,14 @@ class ContinuousQuery(object):
immediately (if the query was terminated by :func:`stop()`), or throw the exception
immediately (if the query has terminated with exception).
- throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jcq.awaitTermination(int(timeout * 1000))
+ return self._jsq.awaitTermination(int(timeout * 1000))
else:
- return self._jcq.awaitTermination()
+ return self._jsq.awaitTermination()
@since(2.0)
def processAllAvailable(self):
@@ -92,26 +92,25 @@ class ContinuousQuery(object):
until data that has been synchronously appended data to a stream source prior to invocation.
(i.e. `getOffset` must immediately reflect the addition).
"""
- return self._jcq.processAllAvailable()
+ return self._jsq.processAllAvailable()
@since(2.0)
def stop(self):
- """Stop this continuous query.
+ """Stop this streaming query.
"""
- self._jcq.stop()
+ self._jsq.stop()
-class ContinuousQueryManager(object):
- """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active
- on a :class:`SQLContext`.
+class StreamingQueryManager(object):
+ """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
.. note:: Experimental
.. versionadded:: 2.0
"""
- def __init__(self, jcqm):
- self._jcqm = jcqm
+ def __init__(self, jsqm):
+ self._jsqm = jsqm
@property
@ignore_unicode_prefix
@@ -119,14 +118,14 @@ class ContinuousQueryManager(object):
def active(self):
"""Returns a list of active queries associated with this SQLContext
- >>> cq = df.writeStream.format('memory').queryName('this_query').start()
- >>> cqm = spark.streams
- >>> # get the list of active continuous queries
- >>> [q.name for q in cqm.active]
+ >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sqm = spark.streams
+ >>> # get the list of active streaming queries
+ >>> [q.name for q in sqm.active]
[u'this_query']
- >>> cq.stop()
+ >>> sq.stop()
"""
- return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
+ return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
@ignore_unicode_prefix
@since(2.0)
@@ -134,20 +133,20 @@ class ContinuousQueryManager(object):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> cq = df.writeStream.format('memory').queryName('this_query').start()
- >>> cq.name
+ >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.name
u'this_query'
- >>> cq = spark.streams.get(cq.id)
- >>> cq.isActive
+ >>> sq = spark.streams.get(sq.id)
+ >>> sq.isActive
True
- >>> cq = sqlContext.streams.get(cq.id)
- >>> cq.isActive
+ >>> sq = sqlContext.streams.get(sq.id)
+ >>> sq.isActive
True
- >>> cq.stop()
+ >>> sq.stop()
"""
if not isinstance(id, intlike):
raise ValueError("The id for the query must be an integer. Got: %s" % id)
- return ContinuousQuery(self._jcqm.get(id))
+ return StreamingQuery(self._jsqm.get(id))
@since(2.0)
def awaitAnyTermination(self, timeout=None):
@@ -168,14 +167,14 @@ class ContinuousQueryManager(object):
queries, users need to stop all of them after any of them terminates with exception, and
then check the `query.exception()` for each query.
- throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jcqm.awaitAnyTermination(int(timeout * 1000))
+ return self._jsqm.awaitAnyTermination(int(timeout * 1000))
else:
- return self._jcqm.awaitAnyTermination()
+ return self._jsqm.awaitAnyTermination()
@since(2.0)
def resetTerminated(self):
@@ -184,11 +183,11 @@ class ContinuousQueryManager(object):
>>> spark.streams.resetTerminated()
"""
- self._jcqm.resetTerminated()
+ self._jsqm.resetTerminated()
class Trigger(object):
- """Used to indicate how often results should be produced by a :class:`ContinuousQuery`.
+ """Used to indicate how often results should be produced by a :class:`StreamingQuery`.
.. note:: Experimental
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fee960a1a7..1d5d691696 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_save_options(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
.format('parquet').outputMode('append').option('path', out).start()
try:
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
+ self.assertEqual(q.name, 'this_query')
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
@@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.writeStream.option('checkpointLocation', fake1)\
+ q = df.writeStream.option('checkpointLocation', fake1)\
.format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
+ self.assertEqual(q.name, 'this_query')
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if not f.startswith('.')])
@@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase):
self.assertFalse(os.path.isdir(fake1)) # should not have been created
self.assertFalse(os.path.isdir(fake2)) # should not have been created
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream\
+ q = df.writeStream\
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertTrue(cq.isActive)
+ self.assertTrue(q.isActive)
try:
- cq.awaitTermination("hello")
+ q.awaitTermination("hello")
self.fail("Expected a value exception")
except ValueError:
pass
now = time.time()
# test should take at least 2 seconds
- res = cq.awaitTermination(2.6)
+ res = q.awaitTermination(2.6)
duration = time.time() - now
self.assertTrue(duration >= 2)
self.assertFalse(res)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream\
+ q = df.writeStream\
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertTrue(cq.isActive)
+ self.assertTrue(q.isActive)
try:
self.spark._wrapped.streams.awaitAnyTermination("hello")
self.fail("Expected a value exception")
@@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(duration >= 2)
self.assertFalse(res)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_help_command(self):
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 9ddaf78acf..2a85ec01bc 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -45,9 +45,9 @@ class IllegalArgumentException(CapturedException):
"""
-class ContinuousQueryException(CapturedException):
+class StreamingQueryException(CapturedException):
"""
- Exception that stopped a :class:`ContinuousQuery`.
+ Exception that stopped a :class:`StreamingQuery`.
"""
@@ -71,8 +71,8 @@ def capture_sql_exception(f):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
- if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '):
- raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
+ if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '):
+ raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
if s.startswith('java.lang.IllegalArgumentException: '):