diff options
33 files changed, 251 insertions, 254 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index a271afe4cf..8a1a874884 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -444,13 +444,13 @@ class SQLContext(object): @property @since(2.0) def streams(self): - """Returns a :class:`ContinuousQueryManager` that allows managing all the - :class:`ContinuousQuery` ContinuousQueries active on `this` context. + """Returns a :class:`StreamingQueryManager` that allows managing all the + :class:`StreamingQuery` StreamingQueries active on `this` context. .. note:: Experimental. """ - from pyspark.sql.streaming import ContinuousQueryManager - return ContinuousQueryManager(self._ssql_ctx.streams()) + from pyspark.sql.streaming import StreamingQueryManager + return StreamingQueryManager(self._ssql_ctx.streams()) class HiveContext(SQLContext): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0126faf574..acf9d08b23 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,7 +257,7 @@ class DataFrame(object): def isStreaming(self): """Returns true if this :class:`Dataset` contains one or more sources that continuously return data as it arrives. A :class:`Dataset` that reads data from a streaming source - must be executed as a :class:`ContinuousQuery` using the :func:`startStream` method in + must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in :class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ad954d0ad8..c982de6840 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq from pyspark.sql.types import * from pyspark.sql import utils -__all__ = ["DataFrameReader", "DataFrameWriter"] +__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"] def to_str(value): @@ -458,9 +458,9 @@ class DataFrameWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.write() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(1.4) def mode(self, saveMode): @@ -1094,9 +1094,9 @@ class DataStreamWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.writeStream() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(2.0) def outputMode(self, outputMode): @@ -1169,8 +1169,8 @@ class DataStreamWriter(object): @since(2.0) def queryName(self, queryName): - """Specifies the name of the :class:`ContinuousQuery` that can be started with - :func:`startStream`. This name must be unique among all the currently active queries + """Specifies the name of the :class:`StreamingQuery` that can be started with + :func:`start`. This name must be unique among all the currently active queries in the associated SparkSession. .. note:: Experimental. @@ -1232,21 +1232,21 @@ class DataStreamWriter(object): :param options: All other string options. You may want to provide a `checkpointLocation` for most streams, however it is not required for a `memory` stream. - >>> cq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> cq.isActive + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive True - >>> cq.name + >>> sq.name u'this_query' - >>> cq.stop() - >>> cq.isActive + >>> sq.stop() + >>> sq.isActive False - >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start( + >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( ... queryName='that_query', format='memory') - >>> cq.name + >>> sq.name u'that_query' - >>> cq.isActive + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ self.options(**options) if partitionBy is not None: @@ -1256,9 +1256,9 @@ class DataStreamWriter(object): if queryName is not None: self.queryName(queryName) if path is None: - return self._cq(self._jwrite.start()) + return self._sq(self._jwrite.start()) else: - return self._cq(self._jwrite.start(path)) + return self._sq(self._jwrite.start(path)) def _test(): diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 11c815dd94..6edbd59856 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -565,15 +565,15 @@ class SparkSession(object): @property @since(2.0) def streams(self): - """Returns a :class:`ContinuousQueryManager` that allows managing all the - :class:`ContinuousQuery` ContinuousQueries active on `this` context. + """Returns a :class:`StreamingQueryManager` that allows managing all the + :class:`StreamingQuery` StreamingQueries active on `this` context. .. note:: Experimental. - :return: :class:`ContinuousQueryManager` + :return: :class:`StreamingQueryManager` """ - from pyspark.sql.streaming import ContinuousQueryManager - return ContinuousQueryManager(self._jsparkSession.streams()) + from pyspark.sql.streaming import StreamingQueryManager + return StreamingQueryManager(self._jsparkSession.streams()) @since(2.0) def stop(self): diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1d650946be..ae45c99e4f 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod from pyspark import since from pyspark.rdd import ignore_unicode_prefix -__all__ = ["ContinuousQuery"] +__all__ = ["StreamingQuery"] -class ContinuousQuery(object): +class StreamingQuery(object): """ A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe. @@ -39,30 +39,30 @@ class ContinuousQuery(object): .. versionadded:: 2.0 """ - def __init__(self, jcq): - self._jcq = jcq + def __init__(self, jsq): + self._jsq = jsq @property @since(2.0) def id(self): - """The id of the continuous query. This id is unique across all queries that have been + """The id of the streaming query. This id is unique across all queries that have been started in the current process. """ - return self._jcq.id() + return self._jsq.id() @property @since(2.0) def name(self): - """The name of the continuous query. This name is unique across all active queries. + """The name of the streaming query. This name is unique across all active queries. """ - return self._jcq.name() + return self._jsq.name() @property @since(2.0) def isActive(self): - """Whether this continuous query is currently active or not. + """Whether this streaming query is currently active or not. """ - return self._jcq.isActive() + return self._jsq.isActive() @since(2.0) def awaitTermination(self, timeout=None): @@ -75,14 +75,14 @@ class ContinuousQuery(object): immediately (if the query was terminated by :func:`stop()`), or throw the exception immediately (if the query has terminated with exception). - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcq.awaitTermination(int(timeout * 1000)) + return self._jsq.awaitTermination(int(timeout * 1000)) else: - return self._jcq.awaitTermination() + return self._jsq.awaitTermination() @since(2.0) def processAllAvailable(self): @@ -92,26 +92,25 @@ class ContinuousQuery(object): until data that has been synchronously appended data to a stream source prior to invocation. (i.e. `getOffset` must immediately reflect the addition). """ - return self._jcq.processAllAvailable() + return self._jsq.processAllAvailable() @since(2.0) def stop(self): - """Stop this continuous query. + """Stop this streaming query. """ - self._jcq.stop() + self._jsq.stop() -class ContinuousQueryManager(object): - """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active - on a :class:`SQLContext`. +class StreamingQueryManager(object): + """A class to manage all the :class:`StreamingQuery` StreamingQueries active. .. note:: Experimental .. versionadded:: 2.0 """ - def __init__(self, jcqm): - self._jcqm = jcqm + def __init__(self, jsqm): + self._jsqm = jsqm @property @ignore_unicode_prefix @@ -119,14 +118,14 @@ class ContinuousQueryManager(object): def active(self): """Returns a list of active queries associated with this SQLContext - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cqm = spark.streams - >>> # get the list of active continuous queries - >>> [q.name for q in cqm.active] + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sqm = spark.streams + >>> # get the list of active streaming queries + >>> [q.name for q in sqm.active] [u'this_query'] - >>> cq.stop() + >>> sq.stop() """ - return [ContinuousQuery(jcq) for jcq in self._jcqm.active()] + return [StreamingQuery(jsq) for jsq in self._jsqm.active()] @ignore_unicode_prefix @since(2.0) @@ -134,20 +133,20 @@ class ContinuousQueryManager(object): """Returns an active query from this SQLContext or throws exception if an active query with this name doesn't exist. - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cq.name + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sq.name u'this_query' - >>> cq = spark.streams.get(cq.id) - >>> cq.isActive + >>> sq = spark.streams.get(sq.id) + >>> sq.isActive True - >>> cq = sqlContext.streams.get(cq.id) - >>> cq.isActive + >>> sq = sqlContext.streams.get(sq.id) + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ if not isinstance(id, intlike): raise ValueError("The id for the query must be an integer. Got: %s" % id) - return ContinuousQuery(self._jcqm.get(id)) + return StreamingQuery(self._jsqm.get(id)) @since(2.0) def awaitAnyTermination(self, timeout=None): @@ -168,14 +167,14 @@ class ContinuousQueryManager(object): queries, users need to stop all of them after any of them terminates with exception, and then check the `query.exception()` for each query. - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcqm.awaitAnyTermination(int(timeout * 1000)) + return self._jsqm.awaitAnyTermination(int(timeout * 1000)) else: - return self._jcqm.awaitAnyTermination() + return self._jsqm.awaitAnyTermination() @since(2.0) def resetTerminated(self): @@ -184,11 +183,11 @@ class ContinuousQueryManager(object): >>> spark.streams.resetTerminated() """ - self._jcqm.resetTerminated() + self._jsqm.resetTerminated() class Trigger(object): - """Used to indicate how often results should be produced by a :class:`ContinuousQuery`. + """Used to indicate how often results should be produced by a :class:`StreamingQuery`. .. note:: Experimental diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fee960a1a7..1d5d691696 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase): def test_stream_save_options(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ + q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ .format('parquet').outputMode('append').option('path', out).start() try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_save_options_overwrite(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) @@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase): chk = os.path.join(tmpPath, 'chk') fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') - cq = df.writeStream.option('checkpointLocation', fake1)\ + q = df.writeStream.option('checkpointLocation', fake1)\ .format('memory').option('path', fake2) \ .queryName('fake_query').outputMode('append') \ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) @@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase): self.assertFalse(os.path.isdir(fake1)) # should not have been created self.assertFalse(os.path.isdir(fake2)) # should not have been created finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: - cq.awaitTermination("hello") + q.awaitTermination("hello") self.fail("Expected a value exception") except ValueError: pass now = time.time() # test should take at least 2 seconds - res = cq.awaitTermination(2.6) + res = q.awaitTermination(2.6) duration = time.time() - now self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_query_manager_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: self.spark._wrapped.streams.awaitAnyTermination("hello") self.fail("Expected a value exception") @@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_help_command(self): diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 9ddaf78acf..2a85ec01bc 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -45,9 +45,9 @@ class IllegalArgumentException(CapturedException): """ -class ContinuousQueryException(CapturedException): +class StreamingQueryException(CapturedException): """ - Exception that stopped a :class:`ContinuousQuery`. + Exception that stopped a :class:`StreamingQuery`. """ @@ -71,8 +71,8 @@ def capture_sql_exception(f): raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) - if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '): - raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace) + if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '): + raise StreamingQueryException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '): raise QueryExecutionException(s.split(': ', 1)[1], stackTrace) if s.startswith('java.lang.IllegalArgumentException: '): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f9db325ea2..fba4066af6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython -import org.apache.spark.sql.streaming.{ContinuousQuery, DataStreamWriter} +import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -455,7 +455,7 @@ class Dataset[T] private[sql]( /** * Returns true if this Dataset contains one or more sources that continuously * return data as it arrives. A Dataset that reads data from a streaming source - * must be executed as a [[ContinuousQuery]] using the `startStream()` method in + * must be executed as a [[StreamingQuery]] using the `startStream()` method in * [[DataFrameWriter]]. Methods that return a single answer, e.g. `count()` or * `collect()`, will throw an [[AnalysisException]] when there is a streaming * source present. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index 09f07426a6..f56b25b557 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.streaming.ContinuousQuery +import org.apache.spark.sql.streaming.StreamingQuery /** * :: Experimental :: - * A class to consume data generated by a [[ContinuousQuery]]. Typically this is used to send the + * A class to consume data generated by a [[StreamingQuery]]. Typically this is used to send the * generated data to external systems. Each partition will use a new deserialized instance, so you * usually should do all the initialization (e.g. opening a connection or initiating a transaction) * in the `open` method. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 33f62915df..e7627ac2c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.streaming.{ContinuousQueryManager, DataStreamReader} +import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ExecutionListenerManager @@ -716,12 +716,12 @@ class SQLContext private[sql](val sparkSession: SparkSession) } /** - * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context. + * Returns a [[StreamingQueryManager]] that allows managing all the + * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active on `this` context. * * @since 2.0.0 */ - def streams: ContinuousQueryManager = sparkSession.streams + def streams: StreamingQueryManager = sparkSession.streams /** * Returns the names of tables in the current database as an array. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 9137a735dd..251f47d5fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -178,13 +178,13 @@ class SparkSession private( /** * :: Experimental :: - * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[ContinuousQuery ContinuousQueries]] active on `this`. + * Returns a [[StreamingQueryManager]] that allows managing all the + * [[StreamingQuery StreamingQueries]] active on `this`. * * @since 2.0.0 */ @Experimental - def streams: ContinuousQueryManager = sessionState.continuousQueryManager + def streams: StreamingQueryManager = sessionState.streamingQueryManager /** * Start a new session with isolated SQL configurations, temporary tables, registered diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 31c9f1aef2..6cb1a44a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -47,7 +47,7 @@ private[sql] object SQLExecution { val r = try { // sparkContext.getCallSite() would first try to pick up any call site that was previously // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on - // continuous queries would give us call site like "run at <unknown>:0" + // streaming queries would give us call site like "run at <unknown>:0" val callSite = sparkSession.sparkContext.getCallSite() sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d1261dd6ca..60466e2830 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.ContinuousQuery +import org.apache.spark.sql.streaming.StreamingQuery /** * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting @@ -225,7 +225,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { /** * Used to plan aggregation queries that are computed incrementally as part of a - * [[ContinuousQuery]]. Currently this rule is injected into the planner + * [[StreamingQuery]]. Currently this rule is injected into the planner * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5095fe7fca..4aefd39b36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -52,9 +52,9 @@ class StreamExecution( val trigger: Trigger, private[sql] val triggerClock: Clock, val outputMode: OutputMode) - extends ContinuousQuery with Logging { + extends StreamingQuery with Logging { - import org.apache.spark.sql.streaming.ContinuousQueryListener._ + import org.apache.spark.sql.streaming.StreamingQueryListener._ /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. @@ -101,7 +101,7 @@ class StreamExecution( private[sql] var lastExecution: QueryExecution = null @volatile - private[sql] var streamDeathCause: ContinuousQueryException = null + private[sql] var streamDeathCause: StreamingQueryException = null /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() @@ -140,8 +140,8 @@ class StreamExecution( override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString) - /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */ - override def exception: Option[ContinuousQueryException] = Option(streamDeathCause) + /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */ + override def exception: Option[StreamingQueryException] = Option(streamDeathCause) /** Returns the path of a file with `name` in the checkpoint directory. */ private def checkpointFile(name: String): String = @@ -199,7 +199,7 @@ class StreamExecution( } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() case NonFatal(e) => - streamDeathCause = new ContinuousQueryException( + streamDeathCause = new StreamingQueryException( this, s"Query $name terminated with exception: ${e.getMessage}", e, @@ -227,7 +227,7 @@ class StreamExecution( private def populateStartOffsets(): Unit = { offsetLog.getLatest() match { case Some((batchId, nextOffsets)) => - logInfo(s"Resuming continuous query, starting with batch $batchId") + logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) logDebug(s"Found possibly uncommitted offsets $availableOffsets") @@ -239,7 +239,7 @@ class StreamExecution( } case None => // We are starting this stream for the first time. - logInfo(s"Starting new continuous query.") + logInfo(s"Starting new streaming query.") currentBatchId = 0 constructNextBatch() } @@ -383,7 +383,7 @@ class StreamExecution( postEvent(new QueryProgress(this.toInfo)) } - private def postEvent(event: ContinuousQueryListener.Event) { + private def postEvent(event: StreamingQueryListener.Event) { sparkSession.streams.postListenerEvent(event) } @@ -468,7 +468,7 @@ class StreamExecution( } override def toString: String = { - s"Continuous Query - $name [state = $state]" + s"Streaming Query - $name [state = $state]" } def toDebugString: String = { @@ -476,7 +476,7 @@ class StreamExecution( "Error:\n" + stackTraceToString(streamDeathCause.cause) } else "" s""" - |=== Continuous Query === + |=== Streaming Query === |Name: $name |Current Offsets: $committedOffsets | @@ -490,8 +490,8 @@ class StreamExecution( """.stripMargin } - private def toInfo: ContinuousQueryInfo = { - new ContinuousQueryInfo( + private def toInfo: StreamingQueryInfo = { + new StreamingQueryInfo( this.name, this.id, this.sourceStatuses, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index f50951f9bd..1e663956f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -18,27 +18,27 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} -import org.apache.spark.sql.streaming.ContinuousQueryListener +import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.ListenerBus /** - * A bus to forward events to [[ContinuousQueryListener]]s. This one will send received - * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with - * Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them - * to ContinuousQueryListener. + * A bus to forward events to [[StreamingQueryListener]]s. This one will send received + * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also registers itself with + * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them + * to StreamingQueryListener. */ -class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) - extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { +class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) + extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] { - import ContinuousQueryListener._ + import StreamingQueryListener._ sparkListenerBus.addListener(this) /** - * Post a ContinuousQueryListener event to the Spark listener bus asynchronously. This event will - * be dispatched to all ContinuousQueryListener in the thread of the Spark listener bus. + * Post a StreamingQueryListener event to the Spark listener bus asynchronously. This event will + * be dispatched to all StreamingQueryListener in the thread of the Spark listener bus. */ - def post(event: ContinuousQueryListener.Event) { + def post(event: StreamingQueryListener.Event) { event match { case s: QueryStarted => postToAll(s) @@ -49,15 +49,15 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case e: ContinuousQueryListener.Event => + case e: StreamingQueryListener.Event => postToAll(e) case _ => } } override protected def doPostEvent( - listener: ContinuousQueryListener, - event: ContinuousQueryListener.Event): Unit = { + listener: StreamingQueryListener, + event: StreamingQueryListener.Event): Unit = { event match { case queryStarted: QueryStarted => listener.onQueryStarted(queryStarted) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 02608b0dce..e8bd489be3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2587,7 +2587,7 @@ object functions { * 09:00:25-09:01:25 ... * }}} * - * For a continuous query, you may use the function `current_timestamp` to generate windows on + * For a streaming query, you may use the function `current_timestamp` to generate windows on * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. @@ -2641,7 +2641,7 @@ object functions { * 09:00:20-09:01:20 ... * }}} * - * For a continuous query, you may use the function `current_timestamp` to generate windows on + * For a streaming query, you may use the function `current_timestamp` to generate windows on * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. @@ -2683,7 +2683,7 @@ object functions { * 09:02:00-09:03:00 ... * }}} * - * For a continuous query, you may use the function `current_timestamp` to generate windows on + * For a streaming query, you may use the function `current_timestamp` to generate windows on * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6978b506ac..4b8916f59c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -484,14 +484,14 @@ object SQLConf { .createWithDefault(2) val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation") - .doc("The default location for storing checkpoint data for continuously executing queries.") + .doc("The default location for storing checkpoint data for streaming queries.") .stringConf .createOptional val UNSUPPORTED_OPERATION_CHECK_ENABLED = SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck") .internal() - .doc("When true, the logical plan for continuous query will be checked for unsupported" + + .doc("When true, the logical plan for streaming query will be checked for unsupported" + " operations.") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b43095041b..59efa81275 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource} -import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager} +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager} import org.apache.spark.sql.util.ExecutionListenerManager @@ -143,10 +143,10 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager /** - * Interface to start and stop [[ContinuousQuery]]s. + * Interface to start and stop [[StreamingQuery]]s. */ - lazy val continuousQueryManager: ContinuousQueryManager = { - new ContinuousQueryManager(sparkSession) + lazy val streamingQueryManager: StreamingQueryManager = { + new StreamingQueryManager(sparkSession) } private val jarClassLoader: NonClosableMutableURLClassLoader = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b035ff7938..197707404e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -109,7 +109,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * :: Experimental :: - * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * Specifies the name of the [[StreamingQuery]] that can be started with `startStream()`. * This name must be unique among all the currently active queries in the associated SQLContext. * * @since 2.0.0 @@ -221,26 +221,26 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given - * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ @Experimental - def start(path: String): ContinuousQuery = { + def start(path: String): StreamingQuery = { option("path", path).start() } /** * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given - * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ @Experimental - def start(): ContinuousQuery = { + def start(): StreamingQuery = { if (source == "memory") { assertNotPartitioned("memory") if (extraOptions.get("queryName").isEmpty) { @@ -249,7 +249,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) - val query = df.sparkSession.sessionState.continuousQueryManager.startQuery( + val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, @@ -263,7 +263,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } else if (source == "foreach") { assertNotPartitioned("foreach") val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) - df.sparkSession.sessionState.continuousQueryManager.startQuery( + df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, @@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) - df.sparkSession.sessionState.continuousQueryManager.startQuery( + df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 1e0a47deca..dc81a5b180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.SparkSession * @since 2.0.0 */ @Experimental -trait ContinuousQuery { +trait StreamingQuery { /** * Returns the name of the query. This name is unique across all active queries. This can be @@ -57,10 +57,10 @@ trait ContinuousQuery { def isActive: Boolean /** - * Returns the [[ContinuousQueryException]] if the query was terminated by an exception. + * Returns the [[StreamingQueryException]] if the query was terminated by an exception. * @since 2.0.0 */ - def exception: Option[ContinuousQueryException] + def exception: Option[StreamingQueryException] /** * Returns current status of all the sources. @@ -79,7 +79,7 @@ trait ContinuousQuery { * immediately (if the query was terminated by `stop()`), or throw the exception * immediately (if the query has terminated with exception). * - * @throws ContinuousQueryException, if `this` query has terminated with an exception. + * @throws StreamingQueryException, if `this` query has terminated with an exception. * * @since 2.0.0 */ @@ -95,7 +95,7 @@ trait ContinuousQuery { * `true` immediately (if the query was terminated by `stop()`), or throw the exception * immediately (if the query has terminated with exception). * - * @throws ContinuousQueryException, if `this` query has terminated with an exception + * @throws StreamingQueryException, if `this` query has terminated with an exception * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 5196c5a537..90f95ca9d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} /** * :: Experimental :: - * Exception that stopped a [[ContinuousQuery]]. + * Exception that stopped a [[StreamingQuery]]. * @param query Query that caused the exception * @param message Message of this exception * @param cause Internal cause of this exception @@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} * @since 2.0.0 */ @Experimental -class ContinuousQueryException private[sql]( - @transient val query: ContinuousQuery, +class StreamingQueryException private[sql]( + @transient val query: StreamingQuery, val message: String, val cause: Throwable, val startOffset: Option[Offset] = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala index 19f22704ba..1af2668817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala @@ -21,16 +21,16 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: - * A class used to report information about the progress of a [[ContinuousQuery]]. + * A class used to report information about the progress of a [[StreamingQuery]]. * - * @param name The [[ContinuousQuery]] name. This name is unique across all active queries. - * @param id The [[ContinuousQuery]] id. This id is unique across + * @param name The [[StreamingQuery]] name. This name is unique across all active queries. + * @param id The [[StreamingQuery]] id. This id is unique across * all queries that have been started in the current process. - * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources. - * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink. + * @param sourceStatuses The current statuses of the [[StreamingQuery]]'s sources. + * @param sinkStatus The current status of the [[StreamingQuery]]'s sink. */ @Experimental -class ContinuousQueryInfo private[sql]( +class StreamingQueryInfo private[sql]( val name: String, val id: Long, val sourceStatuses: Seq[SourceStatus], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index dd311148e0..c43de58faa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -22,22 +22,22 @@ import org.apache.spark.scheduler.SparkListenerEvent /** * :: Experimental :: - * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]]. + * Interface for listening to events related to [[StreamingQuery StreamingQueries]]. * @note The methods are not thread-safe as they may be called from different threads. * * @since 2.0.0 */ @Experimental -abstract class ContinuousQueryListener { +abstract class StreamingQueryListener { - import ContinuousQueryListener._ + import StreamingQueryListener._ /** * Called when a query is started. * @note This is called synchronously with * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]], * that is, `onQueryStart` will be called on all listeners before - * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please + * `DataFrameWriter.startStream()` returns the corresponding [[StreamingQuery]]. Please * don't block this method as it will block your query. * @since 2.0.0 */ @@ -46,9 +46,9 @@ abstract class ContinuousQueryListener { /** * Called when there is some status update (ingestion rate updated, etc.) * - * @note This method is asynchronous. The status in [[ContinuousQuery]] will always be - * latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]] - * may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]] + * @note This method is asynchronous. The status in [[StreamingQuery]] will always be + * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]] + * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]] * is terminated when you are processing [[QueryProgress]]. * @since 2.0.0 */ @@ -64,15 +64,15 @@ abstract class ContinuousQueryListener { /** * :: Experimental :: - * Companion object of [[ContinuousQueryListener]] that defines the listener events. + * Companion object of [[StreamingQueryListener]] that defines the listener events. * @since 2.0.0 */ @Experimental -object ContinuousQueryListener { +object StreamingQueryListener { /** * :: Experimental :: - * Base type of [[ContinuousQueryListener]] events + * Base type of [[StreamingQueryListener]] events * @since 2.0.0 */ @Experimental @@ -84,7 +84,7 @@ object ContinuousQueryListener { * @since 2.0.0 */ @Experimental - class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event + class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends Event /** * :: Experimental :: @@ -92,14 +92,14 @@ object ContinuousQueryListener { * @since 2.0.0 */ @Experimental - class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event + class QueryProgress private[sql](val queryInfo: StreamingQueryInfo) extends Event /** * :: Experimental :: * Event representing that termination of a query * * @param queryInfo Information about the status of the query. - * @param exception The exception message of the [[ContinuousQuery]] if the query was terminated + * @param exception The exception message of the [[StreamingQuery]] if the query was terminated * with an exception. Otherwise, it will be `None`. * @param stackTrace The stack trace of the exception if the query was terminated with an * exception. It will be empty if there was no error. @@ -107,7 +107,7 @@ object ContinuousQueryListener { */ @Experimental class QueryTerminated private[sql]( - val queryInfo: ContinuousQueryInfo, + val queryInfo: StreamingQueryInfo, val exception: Option[String], val stackTrace: Seq[StackTraceElement]) extends Event } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 0f4a9c9975..bae7f56a23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -31,28 +31,28 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} /** * :: Experimental :: - * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]]. + * A class to manage all the [[StreamingQuery]] active on a [[SparkSession]]. * * @since 2.0.0 */ @Experimental -class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { +class StreamingQueryManager private[sql] (sparkSession: SparkSession) { private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) - private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus) - private val activeQueries = new mutable.HashMap[Long, ContinuousQuery] + private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) + private val activeQueries = new mutable.HashMap[Long, StreamingQuery] private val activeQueriesLock = new Object private val awaitTerminationLock = new Object - private var lastTerminatedQuery: ContinuousQuery = null + private var lastTerminatedQuery: StreamingQuery = null /** * Returns a list of active queries associated with this SQLContext * * @since 2.0.0 */ - def active: Array[ContinuousQuery] = activeQueriesLock.synchronized { + def active: Array[StreamingQuery] = activeQueriesLock.synchronized { activeQueries.values.toArray } @@ -61,7 +61,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { * * @since 2.0.0 */ - def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized { + def get(id: Long): StreamingQuery = activeQueriesLock.synchronized { activeQueries.get(id).orNull } @@ -81,7 +81,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { * users need to stop all of them after any of them terminates with exception, and then check the * `query.exception()` for each query. * - * @throws ContinuousQueryException, if any query has terminated with an exception + * @throws StreamingQueryException, if any query has terminated with an exception * * @since 2.0.0 */ @@ -113,7 +113,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { * users need to stop all of them after any of them terminates with exception, and then check the * `query.exception()` for each query. * - * @throws ContinuousQueryException, if any query has terminated with an exception + * @throws StreamingQueryException, if any query has terminated with an exception * * @since 2.0.0 */ @@ -146,31 +146,31 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { } /** - * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of - * [[ContinuousQuery]]. + * Register a [[StreamingQueryListener]] to receive up-calls for life cycle events of + * [[StreamingQuery]]. * * @since 2.0.0 */ - def addListener(listener: ContinuousQueryListener): Unit = { + def addListener(listener: StreamingQueryListener): Unit = { listenerBus.addListener(listener) } /** - * Deregister a [[ContinuousQueryListener]]. + * Deregister a [[StreamingQueryListener]]. * * @since 2.0.0 */ - def removeListener(listener: ContinuousQueryListener): Unit = { + def removeListener(listener: StreamingQueryListener): Unit = { listenerBus.removeListener(listener) } /** Post a listener event */ - private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): Unit = { + private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = { listenerBus.post(event) } /** - * Start a [[ContinuousQuery]]. + * Start a [[StreamingQuery]]. * @param userSpecifiedName Query name optionally specified by the user. * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. * @param df Streaming DataFrame. @@ -193,7 +193,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), - triggerClock: Clock = new SystemClock()): ContinuousQuery = { + triggerClock: Clock = new SystemClock()): StreamingQuery = { activeQueriesLock.synchronized { val id = StreamExecution.nextId val name = userSpecifiedName.getOrElse(s"query-$id") @@ -264,8 +264,8 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { } } - /** Notify (by the ContinuousQuery) that the query has been terminated */ - private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = { + /** Notify (by the StreamingQuery) that the query has been terminated */ + private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = { activeQueriesLock.synchronized { activeQueries -= terminatedQuery.id } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala index d3fdbac576..55be7a711a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala @@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.CalendarInterval /** * :: Experimental :: - * Used to indicate how often results should be produced by a [[ContinuousQuery]]. + * Used to indicate how often results should be produced by a [[StreamingQuery]]. * * @since 2.0.0 */ @@ -65,7 +65,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { /** * :: Experimental :: - * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s. + * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s. * * @since 2.0.0 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index a5acc970e3..9d0a2b3d5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -124,7 +124,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - var query: ContinuousQuery = null + var query: StreamingQuery = null try { query = @@ -156,7 +156,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - var query: ContinuousQuery = null + var query: StreamingQuery = null try { query = @@ -240,7 +240,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - var query: ContinuousQuery = null + var query: StreamingQuery = null try { val writer = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 0e157cf726..f9e236c449 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -57,7 +57,7 @@ class FileStressSuite extends StreamTest { @volatile var continue = true @volatile - var stream: ContinuousQuery = null + var stream: StreamingQuery = null val writer = new Thread("stream writer") { override def run(): Unit = { @@ -100,7 +100,7 @@ class FileStressSuite extends StreamTest { val input = spark.readStream.format("text").load(inputDir) - def startStream(): ContinuousQuery = { + def startStream(): StreamingQuery = { val output = input .repartition(5) .as[String] @@ -139,7 +139,7 @@ class FileStressSuite extends StreamTest { try { stream.awaitTermination() } catch { - case ce: ContinuousQueryException => + case ce: StreamingQueryException => failures += 1 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index cbfa6ff07d..720ffaf732 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -353,7 +353,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case ef: ExpectFailure[_] => verify(currentStream != null, "can not expect failure when stream is not running") try failAfter(streamingTimeout) { - val thrownException = intercept[ContinuousQueryException] { + val thrownException = intercept[StreamingQueryException] { currentStream.awaitTermination() } eventually("microbatch thread not stopped after termination with failure") { @@ -563,7 +563,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case e: ExpectException[_] => val thrownException = withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") { - intercept[ContinuousQueryException] { + intercept[StreamingQueryException] { failAfter(testTimeout) { awaitTermFunc() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 7f44227ec4..8681199817 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -40,8 +40,6 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { import testImplicits._ - - test("simple count, update mode") { val inputData = MemoryStream[Int] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 8e1de09a96..7f4d28cf05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -31,10 +31,10 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.util.JsonProtocol -class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { +class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { import testImplicits._ - import ContinuousQueryListener._ + import StreamingQueryListener._ after { spark.streams.active.foreach(_.stop()) @@ -167,35 +167,35 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStarted serialization") { - val queryStartedInfo = new ContinuousQueryInfo( + val queryStartedInfo = new StreamingQueryInfo( "name", 1, Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)), new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString)) - val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo) + val queryStarted = new StreamingQueryListener.QueryStarted(queryStartedInfo) val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[ContinuousQueryListener.QueryStarted] - assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo) + .asInstanceOf[StreamingQueryListener.QueryStarted] + assertStreamingQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo) } test("QueryProgress serialization") { - val queryProcessInfo = new ContinuousQueryInfo( + val queryProcessInfo = new StreamingQueryInfo( "name", 1, Seq( new SourceStatus("source1", Some(LongOffset(0).toString)), new SourceStatus("source2", Some(LongOffset(1).toString))), new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) - val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo) + val queryProcess = new StreamingQueryListener.QueryProgress(queryProcessInfo) val json = JsonProtocol.sparkEventToJson(queryProcess) val newQueryProcess = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[ContinuousQueryListener.QueryProgress] - assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo) + .asInstanceOf[StreamingQueryListener.QueryProgress] + assertStreamingQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo) } test("QueryTerminated serialization") { - val queryTerminatedInfo = new ContinuousQueryInfo( + val queryTerminatedInfo = new StreamingQueryInfo( "name", 1, Seq( @@ -203,21 +203,21 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { new SourceStatus("source2", Some(LongOffset(1).toString))), new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) val exception = new RuntimeException("exception") - val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated( + val queryQueryTerminated = new StreamingQueryListener.QueryTerminated( queryTerminatedInfo, Some(exception.getMessage), exception.getStackTrace) val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[ContinuousQueryListener.QueryTerminated] - assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo) + .asInstanceOf[StreamingQueryListener.QueryTerminated] + assertStreamingQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo) assert(queryQueryTerminated.exception === newQueryTerminated.exception) } - private def assertContinuousQueryInfoEquals( - expected: ContinuousQueryInfo, - actual: ContinuousQueryInfo): Unit = { + private def assertStreamingQueryInfoEquals( + expected: StreamingQueryInfo, + actual: StreamingQueryInfo): Unit = { assert(expected.name === actual.name) assert(expected.sourceStatuses.size === actual.sourceStatuses.size) expected.sourceStatuses.zip(actual.sourceStatuses).foreach { @@ -237,7 +237,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(expected.offsetDesc === actual.offsetDesc) } - private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { + private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = { try { failAfter(1 minute) { spark.streams.addListener(listener) @@ -248,23 +248,23 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def addedListeners(): Array[ContinuousQueryListener] = { + private def addedListeners(): Array[StreamingQueryListener] = { val listenerBusMethod = - PrivateMethod[ContinuousQueryListenerBus]('listenerBus) + PrivateMethod[StreamingQueryListenerBus]('listenerBus) val listenerBus = spark.streams invokePrivate listenerBusMethod() - listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener]) + listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener]) } - class QueryStatusCollector extends ContinuousQueryListener { + class QueryStatusCollector extends StreamingQueryListener { // to catch errors in the async listener events @volatile private var asyncTestWaiter = new Waiter - @volatile var startStatus: ContinuousQueryInfo = null - @volatile var terminationStatus: ContinuousQueryInfo = null + @volatile var startStatus: StreamingQueryInfo = null + @volatile var terminationStatus: StreamingQueryInfo = null @volatile var terminationException: Option[String] = null @volatile var terminationStackTrace: Seq[StackTraceElement] = null - val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo] + val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo] def reset(): Unit = { startStatus = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index ef2fcbf73e..41ffd56cf1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.streaming._ import org.apache.spark.util.Utils -class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { +class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { import AwaitTerminationTester._ import testImplicits._ @@ -215,7 +215,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { /** Run a body of code by defining a query on each dataset */ - private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = { + private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { failAfter(streamingTimeout) { val queries = withClue("Error starting queries") { datasets.zipWithIndex.map { case (ds, i) => @@ -269,7 +269,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { } /** Stop a random active query either with `stop()` or with an error */ - private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): ContinuousQuery = { + private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): StreamingQuery = { import scala.concurrent.ExecutionContext.Implicits.global diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index ad6bc27729..9d58315c20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, Me import org.apache.spark.util.Utils -class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { +class StreamingQuerySuite extends StreamTest with BeforeAndAfter { import AwaitTerminationTester._ import testImplicits._ @@ -37,7 +37,7 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map { 6 / _} - def startQuery(queryName: String): ContinuousQuery = { + def startQuery(queryName: String): StreamingQuery = { val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath val writer = mapped.writeStream writer @@ -126,7 +126,7 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { } /** - * A [[StreamAction]] to test the behavior of `ContinuousQuery.awaitTermination()`. + * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. * * @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown) * @param timeoutMs Timeout in milliseconds @@ -151,7 +151,7 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { object TestAwaitTermination { /** - * Tests the behavior of `ContinuousQuery.awaitTermination`. + * Tests the behavior of `StreamingQuery.awaitTermination`. * * @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown) * @param timeoutMs Timeout in milliseconds diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index c6d374f754..1aee1934c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest} +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -265,7 +265,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { test("unique query names") { /** Start a query with a specific name */ - def startQueryWithName(name: String = ""): ContinuousQuery = { + def startQueryWithName(name: String = ""): StreamingQuery = { spark.readStream .format("org.apache.spark.sql.streaming.test") .load("/test") @@ -277,7 +277,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } /** Start a query without specifying a name */ - def startQueryWithoutName(): ContinuousQuery = { + def startQueryWithoutName(): StreamingQuery = { spark.readStream .format("org.apache.spark.sql.streaming.test") .load("/test") @@ -434,13 +434,13 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .load() - val cq = df.writeStream + val sq = df.writeStream .format("console") .option("checkpointLocation", newMetadataDir) .trigger(ProcessingTime(2.seconds)) .start() - cq.awaitTermination(2000L) + sq.awaitTermination(2000L) } test("prevent all column partitioning") { |