From 9a5071996b968148f6b9aba12e0d3fe888d9acd8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jun 2016 10:46:02 -0700 Subject: [SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery Renamed for simplicity, so that its obvious that its related to streaming. Existing unit tests. Author: Tathagata Das Closes #13673 from tdas/SPARK-15953. --- python/pyspark/sql/context.py | 8 ++-- python/pyspark/sql/dataframe.py | 2 +- python/pyspark/sql/readwriter.py | 40 ++++++++++---------- python/pyspark/sql/session.py | 10 ++--- python/pyspark/sql/streaming.py | 79 ++++++++++++++++++++-------------------- python/pyspark/sql/tests.py | 52 +++++++++++++------------- python/pyspark/sql/utils.py | 8 ++-- 7 files changed, 99 insertions(+), 100 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index a271afe4cf..8a1a874884 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -444,13 +444,13 @@ class SQLContext(object): @property @since(2.0) def streams(self): - """Returns a :class:`ContinuousQueryManager` that allows managing all the - :class:`ContinuousQuery` ContinuousQueries active on `this` context. + """Returns a :class:`StreamingQueryManager` that allows managing all the + :class:`StreamingQuery` StreamingQueries active on `this` context. .. note:: Experimental. """ - from pyspark.sql.streaming import ContinuousQueryManager - return ContinuousQueryManager(self._ssql_ctx.streams()) + from pyspark.sql.streaming import StreamingQueryManager + return StreamingQueryManager(self._ssql_ctx.streams()) class HiveContext(SQLContext): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0126faf574..acf9d08b23 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,7 +257,7 @@ class DataFrame(object): def isStreaming(self): """Returns true if this :class:`Dataset` contains one or more sources that continuously return data as it arrives. A :class:`Dataset` that reads data from a streaming source - must be executed as a :class:`ContinuousQuery` using the :func:`startStream` method in + must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in :class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ad954d0ad8..c982de6840 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq from pyspark.sql.types import * from pyspark.sql import utils -__all__ = ["DataFrameReader", "DataFrameWriter"] +__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"] def to_str(value): @@ -458,9 +458,9 @@ class DataFrameWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.write() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(1.4) def mode(self, saveMode): @@ -1094,9 +1094,9 @@ class DataStreamWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.writeStream() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(2.0) def outputMode(self, outputMode): @@ -1169,8 +1169,8 @@ class DataStreamWriter(object): @since(2.0) def queryName(self, queryName): - """Specifies the name of the :class:`ContinuousQuery` that can be started with - :func:`startStream`. This name must be unique among all the currently active queries + """Specifies the name of the :class:`StreamingQuery` that can be started with + :func:`start`. This name must be unique among all the currently active queries in the associated SparkSession. .. note:: Experimental. @@ -1232,21 +1232,21 @@ class DataStreamWriter(object): :param options: All other string options. You may want to provide a `checkpointLocation` for most streams, however it is not required for a `memory` stream. - >>> cq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> cq.isActive + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive True - >>> cq.name + >>> sq.name u'this_query' - >>> cq.stop() - >>> cq.isActive + >>> sq.stop() + >>> sq.isActive False - >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start( + >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( ... queryName='that_query', format='memory') - >>> cq.name + >>> sq.name u'that_query' - >>> cq.isActive + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ self.options(**options) if partitionBy is not None: @@ -1256,9 +1256,9 @@ class DataStreamWriter(object): if queryName is not None: self.queryName(queryName) if path is None: - return self._cq(self._jwrite.start()) + return self._sq(self._jwrite.start()) else: - return self._cq(self._jwrite.start(path)) + return self._sq(self._jwrite.start(path)) def _test(): diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 11c815dd94..6edbd59856 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -565,15 +565,15 @@ class SparkSession(object): @property @since(2.0) def streams(self): - """Returns a :class:`ContinuousQueryManager` that allows managing all the - :class:`ContinuousQuery` ContinuousQueries active on `this` context. + """Returns a :class:`StreamingQueryManager` that allows managing all the + :class:`StreamingQuery` StreamingQueries active on `this` context. .. note:: Experimental. - :return: :class:`ContinuousQueryManager` + :return: :class:`StreamingQueryManager` """ - from pyspark.sql.streaming import ContinuousQueryManager - return ContinuousQueryManager(self._jsparkSession.streams()) + from pyspark.sql.streaming import StreamingQueryManager + return StreamingQueryManager(self._jsparkSession.streams()) @since(2.0) def stop(self): diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1d650946be..ae45c99e4f 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod from pyspark import since from pyspark.rdd import ignore_unicode_prefix -__all__ = ["ContinuousQuery"] +__all__ = ["StreamingQuery"] -class ContinuousQuery(object): +class StreamingQuery(object): """ A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe. @@ -39,30 +39,30 @@ class ContinuousQuery(object): .. versionadded:: 2.0 """ - def __init__(self, jcq): - self._jcq = jcq + def __init__(self, jsq): + self._jsq = jsq @property @since(2.0) def id(self): - """The id of the continuous query. This id is unique across all queries that have been + """The id of the streaming query. This id is unique across all queries that have been started in the current process. """ - return self._jcq.id() + return self._jsq.id() @property @since(2.0) def name(self): - """The name of the continuous query. This name is unique across all active queries. + """The name of the streaming query. This name is unique across all active queries. """ - return self._jcq.name() + return self._jsq.name() @property @since(2.0) def isActive(self): - """Whether this continuous query is currently active or not. + """Whether this streaming query is currently active or not. """ - return self._jcq.isActive() + return self._jsq.isActive() @since(2.0) def awaitTermination(self, timeout=None): @@ -75,14 +75,14 @@ class ContinuousQuery(object): immediately (if the query was terminated by :func:`stop()`), or throw the exception immediately (if the query has terminated with exception). - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcq.awaitTermination(int(timeout * 1000)) + return self._jsq.awaitTermination(int(timeout * 1000)) else: - return self._jcq.awaitTermination() + return self._jsq.awaitTermination() @since(2.0) def processAllAvailable(self): @@ -92,26 +92,25 @@ class ContinuousQuery(object): until data that has been synchronously appended data to a stream source prior to invocation. (i.e. `getOffset` must immediately reflect the addition). """ - return self._jcq.processAllAvailable() + return self._jsq.processAllAvailable() @since(2.0) def stop(self): - """Stop this continuous query. + """Stop this streaming query. """ - self._jcq.stop() + self._jsq.stop() -class ContinuousQueryManager(object): - """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active - on a :class:`SQLContext`. +class StreamingQueryManager(object): + """A class to manage all the :class:`StreamingQuery` StreamingQueries active. .. note:: Experimental .. versionadded:: 2.0 """ - def __init__(self, jcqm): - self._jcqm = jcqm + def __init__(self, jsqm): + self._jsqm = jsqm @property @ignore_unicode_prefix @@ -119,14 +118,14 @@ class ContinuousQueryManager(object): def active(self): """Returns a list of active queries associated with this SQLContext - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cqm = spark.streams - >>> # get the list of active continuous queries - >>> [q.name for q in cqm.active] + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sqm = spark.streams + >>> # get the list of active streaming queries + >>> [q.name for q in sqm.active] [u'this_query'] - >>> cq.stop() + >>> sq.stop() """ - return [ContinuousQuery(jcq) for jcq in self._jcqm.active()] + return [StreamingQuery(jsq) for jsq in self._jsqm.active()] @ignore_unicode_prefix @since(2.0) @@ -134,20 +133,20 @@ class ContinuousQueryManager(object): """Returns an active query from this SQLContext or throws exception if an active query with this name doesn't exist. - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cq.name + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sq.name u'this_query' - >>> cq = spark.streams.get(cq.id) - >>> cq.isActive + >>> sq = spark.streams.get(sq.id) + >>> sq.isActive True - >>> cq = sqlContext.streams.get(cq.id) - >>> cq.isActive + >>> sq = sqlContext.streams.get(sq.id) + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ if not isinstance(id, intlike): raise ValueError("The id for the query must be an integer. Got: %s" % id) - return ContinuousQuery(self._jcqm.get(id)) + return StreamingQuery(self._jsqm.get(id)) @since(2.0) def awaitAnyTermination(self, timeout=None): @@ -168,14 +167,14 @@ class ContinuousQueryManager(object): queries, users need to stop all of them after any of them terminates with exception, and then check the `query.exception()` for each query. - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcqm.awaitAnyTermination(int(timeout * 1000)) + return self._jsqm.awaitAnyTermination(int(timeout * 1000)) else: - return self._jcqm.awaitAnyTermination() + return self._jsqm.awaitAnyTermination() @since(2.0) def resetTerminated(self): @@ -184,11 +183,11 @@ class ContinuousQueryManager(object): >>> spark.streams.resetTerminated() """ - self._jcqm.resetTerminated() + self._jsqm.resetTerminated() class Trigger(object): - """Used to indicate how often results should be produced by a :class:`ContinuousQuery`. + """Used to indicate how often results should be produced by a :class:`StreamingQuery`. .. note:: Experimental diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fee960a1a7..1d5d691696 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase): def test_stream_save_options(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ + q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ .format('parquet').outputMode('append').option('path', out).start() try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_save_options_overwrite(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) @@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase): chk = os.path.join(tmpPath, 'chk') fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') - cq = df.writeStream.option('checkpointLocation', fake1)\ + q = df.writeStream.option('checkpointLocation', fake1)\ .format('memory').option('path', fake2) \ .queryName('fake_query').outputMode('append') \ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) @@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase): self.assertFalse(os.path.isdir(fake1)) # should not have been created self.assertFalse(os.path.isdir(fake2)) # should not have been created finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: - cq.awaitTermination("hello") + q.awaitTermination("hello") self.fail("Expected a value exception") except ValueError: pass now = time.time() # test should take at least 2 seconds - res = cq.awaitTermination(2.6) + res = q.awaitTermination(2.6) duration = time.time() - now self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_query_manager_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: self.spark._wrapped.streams.awaitAnyTermination("hello") self.fail("Expected a value exception") @@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_help_command(self): diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 9ddaf78acf..2a85ec01bc 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -45,9 +45,9 @@ class IllegalArgumentException(CapturedException): """ -class ContinuousQueryException(CapturedException): +class StreamingQueryException(CapturedException): """ - Exception that stopped a :class:`ContinuousQuery`. + Exception that stopped a :class:`StreamingQuery`. """ @@ -71,8 +71,8 @@ def capture_sql_exception(f): raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) - if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '): - raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace) + if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '): + raise StreamingQueryException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '): raise QueryExecutionException(s.split(': ', 1)[1], stackTrace) if s.startswith('java.lang.IllegalArgumentException: '): -- cgit v1.2.3