aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/context.py8
-rw-r--r--python/pyspark/sql/dataframe.py2
-rw-r--r--python/pyspark/sql/readwriter.py40
-rw-r--r--python/pyspark/sql/session.py10
-rw-r--r--python/pyspark/sql/streaming.py79
-rw-r--r--python/pyspark/sql/tests.py52
-rw-r--r--python/pyspark/sql/utils.py8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala)28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala)10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala)12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala)28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala)38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala)50
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala)6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala)8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala10
33 files changed, 251 insertions, 254 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index a271afe4cf..8a1a874884 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -444,13 +444,13 @@ class SQLContext(object):
@property
@since(2.0)
def streams(self):
- """Returns a :class:`ContinuousQueryManager` that allows managing all the
- :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+ """Returns a :class:`StreamingQueryManager` that allows managing all the
+ :class:`StreamingQuery` StreamingQueries active on `this` context.
.. note:: Experimental.
"""
- from pyspark.sql.streaming import ContinuousQueryManager
- return ContinuousQueryManager(self._ssql_ctx.streams())
+ from pyspark.sql.streaming import StreamingQueryManager
+ return StreamingQueryManager(self._ssql_ctx.streams())
class HiveContext(SQLContext):
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0126faf574..acf9d08b23 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -257,7 +257,7 @@ class DataFrame(object):
def isStreaming(self):
"""Returns true if this :class:`Dataset` contains one or more sources that continuously
return data as it arrives. A :class:`Dataset` that reads data from a streaming source
- must be executed as a :class:`ContinuousQuery` using the :func:`startStream` method in
+ must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in
:class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or
:func:`collect`) will throw an :class:`AnalysisException` when there is a streaming
source present.
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index ad954d0ad8..c982de6840 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
from pyspark.sql import utils
-__all__ = ["DataFrameReader", "DataFrameWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"]
def to_str(value):
@@ -458,9 +458,9 @@ class DataFrameWriter(object):
self._spark = df.sql_ctx
self._jwrite = df._jdf.write()
- def _cq(self, jcq):
- from pyspark.sql.streaming import ContinuousQuery
- return ContinuousQuery(jcq)
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
@since(1.4)
def mode(self, saveMode):
@@ -1094,9 +1094,9 @@ class DataStreamWriter(object):
self._spark = df.sql_ctx
self._jwrite = df._jdf.writeStream()
- def _cq(self, jcq):
- from pyspark.sql.streaming import ContinuousQuery
- return ContinuousQuery(jcq)
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
@since(2.0)
def outputMode(self, outputMode):
@@ -1169,8 +1169,8 @@ class DataStreamWriter(object):
@since(2.0)
def queryName(self, queryName):
- """Specifies the name of the :class:`ContinuousQuery` that can be started with
- :func:`startStream`. This name must be unique among all the currently active queries
+ """Specifies the name of the :class:`StreamingQuery` that can be started with
+ :func:`start`. This name must be unique among all the currently active queries
in the associated SparkSession.
.. note:: Experimental.
@@ -1232,21 +1232,21 @@ class DataStreamWriter(object):
:param options: All other string options. You may want to provide a `checkpointLocation`
for most streams, however it is not required for a `memory` stream.
- >>> cq = sdf.writeStream.format('memory').queryName('this_query').start()
- >>> cq.isActive
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.isActive
True
- >>> cq.name
+ >>> sq.name
u'this_query'
- >>> cq.stop()
- >>> cq.isActive
+ >>> sq.stop()
+ >>> sq.isActive
False
- >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+ >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
... queryName='that_query', format='memory')
- >>> cq.name
+ >>> sq.name
u'that_query'
- >>> cq.isActive
+ >>> sq.isActive
True
- >>> cq.stop()
+ >>> sq.stop()
"""
self.options(**options)
if partitionBy is not None:
@@ -1256,9 +1256,9 @@ class DataStreamWriter(object):
if queryName is not None:
self.queryName(queryName)
if path is None:
- return self._cq(self._jwrite.start())
+ return self._sq(self._jwrite.start())
else:
- return self._cq(self._jwrite.start(path))
+ return self._sq(self._jwrite.start(path))
def _test():
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 11c815dd94..6edbd59856 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -565,15 +565,15 @@ class SparkSession(object):
@property
@since(2.0)
def streams(self):
- """Returns a :class:`ContinuousQueryManager` that allows managing all the
- :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+ """Returns a :class:`StreamingQueryManager` that allows managing all the
+ :class:`StreamingQuery` StreamingQueries active on `this` context.
.. note:: Experimental.
- :return: :class:`ContinuousQueryManager`
+ :return: :class:`StreamingQueryManager`
"""
- from pyspark.sql.streaming import ContinuousQueryManager
- return ContinuousQueryManager(self._jsparkSession.streams())
+ from pyspark.sql.streaming import StreamingQueryManager
+ return StreamingQueryManager(self._jsparkSession.streams())
@since(2.0)
def stop(self):
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1d650946be..ae45c99e4f 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod
from pyspark import since
from pyspark.rdd import ignore_unicode_prefix
-__all__ = ["ContinuousQuery"]
+__all__ = ["StreamingQuery"]
-class ContinuousQuery(object):
+class StreamingQuery(object):
"""
A handle to a query that is executing continuously in the background as new data arrives.
All these methods are thread-safe.
@@ -39,30 +39,30 @@ class ContinuousQuery(object):
.. versionadded:: 2.0
"""
- def __init__(self, jcq):
- self._jcq = jcq
+ def __init__(self, jsq):
+ self._jsq = jsq
@property
@since(2.0)
def id(self):
- """The id of the continuous query. This id is unique across all queries that have been
+ """The id of the streaming query. This id is unique across all queries that have been
started in the current process.
"""
- return self._jcq.id()
+ return self._jsq.id()
@property
@since(2.0)
def name(self):
- """The name of the continuous query. This name is unique across all active queries.
+ """The name of the streaming query. This name is unique across all active queries.
"""
- return self._jcq.name()
+ return self._jsq.name()
@property
@since(2.0)
def isActive(self):
- """Whether this continuous query is currently active or not.
+ """Whether this streaming query is currently active or not.
"""
- return self._jcq.isActive()
+ return self._jsq.isActive()
@since(2.0)
def awaitTermination(self, timeout=None):
@@ -75,14 +75,14 @@ class ContinuousQuery(object):
immediately (if the query was terminated by :func:`stop()`), or throw the exception
immediately (if the query has terminated with exception).
- throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jcq.awaitTermination(int(timeout * 1000))
+ return self._jsq.awaitTermination(int(timeout * 1000))
else:
- return self._jcq.awaitTermination()
+ return self._jsq.awaitTermination()
@since(2.0)
def processAllAvailable(self):
@@ -92,26 +92,25 @@ class ContinuousQuery(object):
until data that has been synchronously appended data to a stream source prior to invocation.
(i.e. `getOffset` must immediately reflect the addition).
"""
- return self._jcq.processAllAvailable()
+ return self._jsq.processAllAvailable()
@since(2.0)
def stop(self):
- """Stop this continuous query.
+ """Stop this streaming query.
"""
- self._jcq.stop()
+ self._jsq.stop()
-class ContinuousQueryManager(object):
- """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active
- on a :class:`SQLContext`.
+class StreamingQueryManager(object):
+ """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
.. note:: Experimental
.. versionadded:: 2.0
"""
- def __init__(self, jcqm):
- self._jcqm = jcqm
+ def __init__(self, jsqm):
+ self._jsqm = jsqm
@property
@ignore_unicode_prefix
@@ -119,14 +118,14 @@ class ContinuousQueryManager(object):
def active(self):
"""Returns a list of active queries associated with this SQLContext
- >>> cq = df.writeStream.format('memory').queryName('this_query').start()
- >>> cqm = spark.streams
- >>> # get the list of active continuous queries
- >>> [q.name for q in cqm.active]
+ >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sqm = spark.streams
+ >>> # get the list of active streaming queries
+ >>> [q.name for q in sqm.active]
[u'this_query']
- >>> cq.stop()
+ >>> sq.stop()
"""
- return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
+ return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
@ignore_unicode_prefix
@since(2.0)
@@ -134,20 +133,20 @@ class ContinuousQueryManager(object):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> cq = df.writeStream.format('memory').queryName('this_query').start()
- >>> cq.name
+ >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.name
u'this_query'
- >>> cq = spark.streams.get(cq.id)
- >>> cq.isActive
+ >>> sq = spark.streams.get(sq.id)
+ >>> sq.isActive
True
- >>> cq = sqlContext.streams.get(cq.id)
- >>> cq.isActive
+ >>> sq = sqlContext.streams.get(sq.id)
+ >>> sq.isActive
True
- >>> cq.stop()
+ >>> sq.stop()
"""
if not isinstance(id, intlike):
raise ValueError("The id for the query must be an integer. Got: %s" % id)
- return ContinuousQuery(self._jcqm.get(id))
+ return StreamingQuery(self._jsqm.get(id))
@since(2.0)
def awaitAnyTermination(self, timeout=None):
@@ -168,14 +167,14 @@ class ContinuousQueryManager(object):
queries, users need to stop all of them after any of them terminates with exception, and
then check the `query.exception()` for each query.
- throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jcqm.awaitAnyTermination(int(timeout * 1000))
+ return self._jsqm.awaitAnyTermination(int(timeout * 1000))
else:
- return self._jcqm.awaitAnyTermination()
+ return self._jsqm.awaitAnyTermination()
@since(2.0)
def resetTerminated(self):
@@ -184,11 +183,11 @@ class ContinuousQueryManager(object):
>>> spark.streams.resetTerminated()
"""
- self._jcqm.resetTerminated()
+ self._jsqm.resetTerminated()
class Trigger(object):
- """Used to indicate how often results should be produced by a :class:`ContinuousQuery`.
+ """Used to indicate how often results should be produced by a :class:`StreamingQuery`.
.. note:: Experimental
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fee960a1a7..1d5d691696 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_save_options(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
.format('parquet').outputMode('append').option('path', out).start()
try:
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
+ self.assertEqual(q.name, 'this_query')
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
@@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.writeStream.option('checkpointLocation', fake1)\
+ q = df.writeStream.option('checkpointLocation', fake1)\
.format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
+ self.assertEqual(q.name, 'this_query')
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if not f.startswith('.')])
@@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase):
self.assertFalse(os.path.isdir(fake1)) # should not have been created
self.assertFalse(os.path.isdir(fake2)) # should not have been created
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream\
+ q = df.writeStream\
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertTrue(cq.isActive)
+ self.assertTrue(q.isActive)
try:
- cq.awaitTermination("hello")
+ q.awaitTermination("hello")
self.fail("Expected a value exception")
except ValueError:
pass
now = time.time()
# test should take at least 2 seconds
- res = cq.awaitTermination(2.6)
+ res = q.awaitTermination(2.6)
duration = time.time() - now
self.assertTrue(duration >= 2)
self.assertFalse(res)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream\
+ q = df.writeStream\
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertTrue(cq.isActive)
+ self.assertTrue(q.isActive)
try:
self.spark._wrapped.streams.awaitAnyTermination("hello")
self.fail("Expected a value exception")
@@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(duration >= 2)
self.assertFalse(res)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_help_command(self):
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 9ddaf78acf..2a85ec01bc 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -45,9 +45,9 @@ class IllegalArgumentException(CapturedException):
"""
-class ContinuousQueryException(CapturedException):
+class StreamingQueryException(CapturedException):
"""
- Exception that stopped a :class:`ContinuousQuery`.
+ Exception that stopped a :class:`StreamingQuery`.
"""
@@ -71,8 +71,8 @@ def capture_sql_exception(f):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
- if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '):
- raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
+ if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '):
+ raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
if s.startswith('java.lang.IllegalArgumentException: '):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f9db325ea2..fba4066af6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
-import org.apache.spark.sql.streaming.{ContinuousQuery, DataStreamWriter}
+import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -455,7 +455,7 @@ class Dataset[T] private[sql](
/**
* Returns true if this Dataset contains one or more sources that continuously
* return data as it arrives. A Dataset that reads data from a streaming source
- * must be executed as a [[ContinuousQuery]] using the `startStream()` method in
+ * must be executed as a [[StreamingQuery]] using the `startStream()` method in
* [[DataFrameWriter]]. Methods that return a single answer, e.g. `count()` or
* `collect()`, will throw an [[AnalysisException]] when there is a streaming
* source present.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
index 09f07426a6..f56b25b557 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
@@ -18,11 +18,11 @@
package org.apache.spark.sql
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.ContinuousQuery
+import org.apache.spark.sql.streaming.StreamingQuery
/**
* :: Experimental ::
- * A class to consume data generated by a [[ContinuousQuery]]. Typically this is used to send the
+ * A class to consume data generated by a [[StreamingQuery]]. Typically this is used to send the
* generated data to external systems. Each partition will use a new deserialized instance, so you
* usually should do all the initialization (e.g. opening a connection or initiating a transaction)
* in the `open` method.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 33f62915df..e7627ac2c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.streaming.{ContinuousQueryManager, DataStreamReader}
+import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -716,12 +716,12 @@ class SQLContext private[sql](val sparkSession: SparkSession)
}
/**
- * Returns a [[ContinuousQueryManager]] that allows managing all the
- * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context.
+ * Returns a [[StreamingQueryManager]] that allows managing all the
+ * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active on `this` context.
*
* @since 2.0.0
*/
- def streams: ContinuousQueryManager = sparkSession.streams
+ def streams: StreamingQueryManager = sparkSession.streams
/**
* Returns the names of tables in the current database as an array.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 9137a735dd..251f47d5fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -178,13 +178,13 @@ class SparkSession private(
/**
* :: Experimental ::
- * Returns a [[ContinuousQueryManager]] that allows managing all the
- * [[ContinuousQuery ContinuousQueries]] active on `this`.
+ * Returns a [[StreamingQueryManager]] that allows managing all the
+ * [[StreamingQuery StreamingQueries]] active on `this`.
*
* @since 2.0.0
*/
@Experimental
- def streams: ContinuousQueryManager = sessionState.continuousQueryManager
+ def streams: StreamingQueryManager = sessionState.streamingQueryManager
/**
* Start a new session with isolated SQL configurations, temporary tables, registered
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 31c9f1aef2..6cb1a44a20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -47,7 +47,7 @@ private[sql] object SQLExecution {
val r = try {
// sparkContext.getCallSite() would first try to pick up any call site that was previously
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
- // continuous queries would give us call site like "run at <unknown>:0"
+ // streaming queries would give us call site like "run at <unknown>:0"
val callSite = sparkSession.sparkContext.getCallSite()
sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d1261dd6ca..60466e2830 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.ContinuousQuery
+import org.apache.spark.sql.streaming.StreamingQuery
/**
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
@@ -225,7 +225,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
/**
* Used to plan aggregation queries that are computed incrementally as part of a
- * [[ContinuousQuery]]. Currently this rule is injected into the planner
+ * [[StreamingQuery]]. Currently this rule is injected into the planner
* on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]]
*/
object StatefulAggregationStrategy extends Strategy {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5095fe7fca..4aefd39b36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -52,9 +52,9 @@ class StreamExecution(
val trigger: Trigger,
private[sql] val triggerClock: Clock,
val outputMode: OutputMode)
- extends ContinuousQuery with Logging {
+ extends StreamingQuery with Logging {
- import org.apache.spark.sql.streaming.ContinuousQueryListener._
+ import org.apache.spark.sql.streaming.StreamingQueryListener._
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
@@ -101,7 +101,7 @@ class StreamExecution(
private[sql] var lastExecution: QueryExecution = null
@volatile
- private[sql] var streamDeathCause: ContinuousQueryException = null
+ private[sql] var streamDeathCause: StreamingQueryException = null
/* Get the call site in the caller thread; will pass this into the micro batch thread */
private val callSite = Utils.getCallSite()
@@ -140,8 +140,8 @@ class StreamExecution(
override def sinkStatus: SinkStatus =
new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString)
- /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
- override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
+ /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
+ override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
/** Returns the path of a file with `name` in the checkpoint directory. */
private def checkpointFile(name: String): String =
@@ -199,7 +199,7 @@ class StreamExecution(
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
case NonFatal(e) =>
- streamDeathCause = new ContinuousQueryException(
+ streamDeathCause = new StreamingQueryException(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
@@ -227,7 +227,7 @@ class StreamExecution(
private def populateStartOffsets(): Unit = {
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
- logInfo(s"Resuming continuous query, starting with batch $batchId")
+ logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
logDebug(s"Found possibly uncommitted offsets $availableOffsets")
@@ -239,7 +239,7 @@ class StreamExecution(
}
case None => // We are starting this stream for the first time.
- logInfo(s"Starting new continuous query.")
+ logInfo(s"Starting new streaming query.")
currentBatchId = 0
constructNextBatch()
}
@@ -383,7 +383,7 @@ class StreamExecution(
postEvent(new QueryProgress(this.toInfo))
}
- private def postEvent(event: ContinuousQueryListener.Event) {
+ private def postEvent(event: StreamingQueryListener.Event) {
sparkSession.streams.postListenerEvent(event)
}
@@ -468,7 +468,7 @@ class StreamExecution(
}
override def toString: String = {
- s"Continuous Query - $name [state = $state]"
+ s"Streaming Query - $name [state = $state]"
}
def toDebugString: String = {
@@ -476,7 +476,7 @@ class StreamExecution(
"Error:\n" + stackTraceToString(streamDeathCause.cause)
} else ""
s"""
- |=== Continuous Query ===
+ |=== Streaming Query ===
|Name: $name
|Current Offsets: $committedOffsets
|
@@ -490,8 +490,8 @@ class StreamExecution(
""".stripMargin
}
- private def toInfo: ContinuousQueryInfo = {
- new ContinuousQueryInfo(
+ private def toInfo: StreamingQueryInfo = {
+ new StreamingQueryInfo(
this.name,
this.id,
this.sourceStatuses,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index f50951f9bd..1e663956f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -18,27 +18,27 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
-import org.apache.spark.sql.streaming.ContinuousQueryListener
+import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.util.ListenerBus
/**
- * A bus to forward events to [[ContinuousQueryListener]]s. This one will send received
- * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with
- * Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them
- * to ContinuousQueryListener.
+ * A bus to forward events to [[StreamingQueryListener]]s. This one will send received
+ * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also registers itself with
+ * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them
+ * to StreamingQueryListener.
*/
-class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
- extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
+class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+ extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {
- import ContinuousQueryListener._
+ import StreamingQueryListener._
sparkListenerBus.addListener(this)
/**
- * Post a ContinuousQueryListener event to the Spark listener bus asynchronously. This event will
- * be dispatched to all ContinuousQueryListener in the thread of the Spark listener bus.
+ * Post a StreamingQueryListener event to the Spark listener bus asynchronously. This event will
+ * be dispatched to all StreamingQueryListener in the thread of the Spark listener bus.
*/
- def post(event: ContinuousQueryListener.Event) {
+ def post(event: StreamingQueryListener.Event) {
event match {
case s: QueryStarted =>
postToAll(s)
@@ -49,15 +49,15 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case e: ContinuousQueryListener.Event =>
+ case e: StreamingQueryListener.Event =>
postToAll(e)
case _ =>
}
}
override protected def doPostEvent(
- listener: ContinuousQueryListener,
- event: ContinuousQueryListener.Event): Unit = {
+ listener: StreamingQueryListener,
+ event: StreamingQueryListener.Event): Unit = {
event match {
case queryStarted: QueryStarted =>
listener.onQueryStarted(queryStarted)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 02608b0dce..e8bd489be3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2587,7 +2587,7 @@ object functions {
* 09:00:25-09:01:25 ...
* }}}
*
- * For a continuous query, you may use the function `current_timestamp` to generate windows on
+ * For a streaming query, you may use the function `current_timestamp` to generate windows on
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
@@ -2641,7 +2641,7 @@ object functions {
* 09:00:20-09:01:20 ...
* }}}
*
- * For a continuous query, you may use the function `current_timestamp` to generate windows on
+ * For a streaming query, you may use the function `current_timestamp` to generate windows on
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
@@ -2683,7 +2683,7 @@ object functions {
* 09:02:00-09:03:00 ...
* }}}
*
- * For a continuous query, you may use the function `current_timestamp` to generate windows on
+ * For a streaming query, you may use the function `current_timestamp` to generate windows on
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6978b506ac..4b8916f59c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -484,14 +484,14 @@ object SQLConf {
.createWithDefault(2)
val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
- .doc("The default location for storing checkpoint data for continuously executing queries.")
+ .doc("The default location for storing checkpoint data for streaming queries.")
.stringConf
.createOptional
val UNSUPPORTED_OPERATION_CHECK_ENABLED =
SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
.internal()
- .doc("When true, the logical plan for continuous query will be checked for unsupported" +
+ .doc("When true, the logical plan for streaming query will be checked for unsupported" +
" operations.")
.booleanConf
.createWithDefault(true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index b43095041b..59efa81275 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
-import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager}
+import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -143,10 +143,10 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
/**
- * Interface to start and stop [[ContinuousQuery]]s.
+ * Interface to start and stop [[StreamingQuery]]s.
*/
- lazy val continuousQueryManager: ContinuousQueryManager = {
- new ContinuousQueryManager(sparkSession)
+ lazy val streamingQueryManager: StreamingQueryManager = {
+ new StreamingQueryManager(sparkSession)
}
private val jarClassLoader: NonClosableMutableURLClassLoader =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b035ff7938..197707404e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -109,7 +109,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
* :: Experimental ::
- * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`.
+ * Specifies the name of the [[StreamingQuery]] that can be started with `startStream()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
@@ -221,26 +221,26 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
- * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
- def start(path: String): ContinuousQuery = {
+ def start(path: String): StreamingQuery = {
option("path", path).start()
}
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
- * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
- def start(): ContinuousQuery = {
+ def start(): StreamingQuery = {
if (source == "memory") {
assertNotPartitioned("memory")
if (extraOptions.get("queryName").isEmpty) {
@@ -249,7 +249,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
- val query = df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
@@ -263,7 +263,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
} else if (source == "foreach") {
assertNotPartitioned("foreach")
val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
- df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
@@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
- df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 1e0a47deca..dc81a5b180 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SparkSession
* @since 2.0.0
*/
@Experimental
-trait ContinuousQuery {
+trait StreamingQuery {
/**
* Returns the name of the query. This name is unique across all active queries. This can be
@@ -57,10 +57,10 @@ trait ContinuousQuery {
def isActive: Boolean
/**
- * Returns the [[ContinuousQueryException]] if the query was terminated by an exception.
+ * Returns the [[StreamingQueryException]] if the query was terminated by an exception.
* @since 2.0.0
*/
- def exception: Option[ContinuousQueryException]
+ def exception: Option[StreamingQueryException]
/**
* Returns current status of all the sources.
@@ -79,7 +79,7 @@ trait ContinuousQuery {
* immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query has terminated with exception).
*
- * @throws ContinuousQueryException, if `this` query has terminated with an exception.
+ * @throws StreamingQueryException, if `this` query has terminated with an exception.
*
* @since 2.0.0
*/
@@ -95,7 +95,7 @@ trait ContinuousQuery {
* `true` immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query has terminated with exception).
*
- * @throws ContinuousQueryException, if `this` query has terminated with an exception
+ * @throws StreamingQueryException, if `this` query has terminated with an exception
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 5196c5a537..90f95ca9d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
/**
* :: Experimental ::
- * Exception that stopped a [[ContinuousQuery]].
+ * Exception that stopped a [[StreamingQuery]].
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
@@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
* @since 2.0.0
*/
@Experimental
-class ContinuousQueryException private[sql](
- @transient val query: ContinuousQuery,
+class StreamingQueryException private[sql](
+ @transient val query: StreamingQuery,
val message: String,
val cause: Throwable,
val startOffset: Option[Offset] = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
index 19f22704ba..1af2668817 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
@@ -21,16 +21,16 @@ import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
- * A class used to report information about the progress of a [[ContinuousQuery]].
+ * A class used to report information about the progress of a [[StreamingQuery]].
*
- * @param name The [[ContinuousQuery]] name. This name is unique across all active queries.
- * @param id The [[ContinuousQuery]] id. This id is unique across
+ * @param name The [[StreamingQuery]] name. This name is unique across all active queries.
+ * @param id The [[StreamingQuery]] id. This id is unique across
* all queries that have been started in the current process.
- * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
- * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
+ * @param sourceStatuses The current statuses of the [[StreamingQuery]]'s sources.
+ * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
*/
@Experimental
-class ContinuousQueryInfo private[sql](
+class StreamingQueryInfo private[sql](
val name: String,
val id: Long,
val sourceStatuses: Seq[SourceStatus],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index dd311148e0..c43de58faa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -22,22 +22,22 @@ import org.apache.spark.scheduler.SparkListenerEvent
/**
* :: Experimental ::
- * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]].
+ * Interface for listening to events related to [[StreamingQuery StreamingQueries]].
* @note The methods are not thread-safe as they may be called from different threads.
*
* @since 2.0.0
*/
@Experimental
-abstract class ContinuousQueryListener {
+abstract class StreamingQueryListener {
- import ContinuousQueryListener._
+ import StreamingQueryListener._
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]],
* that is, `onQueryStart` will be called on all listeners before
- * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
+ * `DataFrameWriter.startStream()` returns the corresponding [[StreamingQuery]]. Please
* don't block this method as it will block your query.
* @since 2.0.0
*/
@@ -46,9 +46,9 @@ abstract class ContinuousQueryListener {
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
- * @note This method is asynchronous. The status in [[ContinuousQuery]] will always be
- * latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]]
- * may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
+ * @note This method is asynchronous. The status in [[StreamingQuery]] will always be
+ * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
+ * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
* is terminated when you are processing [[QueryProgress]].
* @since 2.0.0
*/
@@ -64,15 +64,15 @@ abstract class ContinuousQueryListener {
/**
* :: Experimental ::
- * Companion object of [[ContinuousQueryListener]] that defines the listener events.
+ * Companion object of [[StreamingQueryListener]] that defines the listener events.
* @since 2.0.0
*/
@Experimental
-object ContinuousQueryListener {
+object StreamingQueryListener {
/**
* :: Experimental ::
- * Base type of [[ContinuousQueryListener]] events
+ * Base type of [[StreamingQueryListener]] events
* @since 2.0.0
*/
@Experimental
@@ -84,7 +84,7 @@ object ContinuousQueryListener {
* @since 2.0.0
*/
@Experimental
- class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event
+ class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends Event
/**
* :: Experimental ::
@@ -92,14 +92,14 @@ object ContinuousQueryListener {
* @since 2.0.0
*/
@Experimental
- class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event
+ class QueryProgress private[sql](val queryInfo: StreamingQueryInfo) extends Event
/**
* :: Experimental ::
* Event representing that termination of a query
*
* @param queryInfo Information about the status of the query.
- * @param exception The exception message of the [[ContinuousQuery]] if the query was terminated
+ * @param exception The exception message of the [[StreamingQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @param stackTrace The stack trace of the exception if the query was terminated with an
* exception. It will be empty if there was no error.
@@ -107,7 +107,7 @@ object ContinuousQueryListener {
*/
@Experimental
class QueryTerminated private[sql](
- val queryInfo: ContinuousQueryInfo,
+ val queryInfo: StreamingQueryInfo,
val exception: Option[String],
val stackTrace: Seq[StackTraceElement]) extends Event
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 0f4a9c9975..bae7f56a23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -31,28 +31,28 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* :: Experimental ::
- * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]].
+ * A class to manage all the [[StreamingQuery]] active on a [[SparkSession]].
*
* @since 2.0.0
*/
@Experimental
-class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
+class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
- private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
- private val activeQueries = new mutable.HashMap[Long, ContinuousQuery]
+ private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
+ private val activeQueries = new mutable.HashMap[Long, StreamingQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object
- private var lastTerminatedQuery: ContinuousQuery = null
+ private var lastTerminatedQuery: StreamingQuery = null
/**
* Returns a list of active queries associated with this SQLContext
*
* @since 2.0.0
*/
- def active: Array[ContinuousQuery] = activeQueriesLock.synchronized {
+ def active: Array[StreamingQuery] = activeQueriesLock.synchronized {
activeQueries.values.toArray
}
@@ -61,7 +61,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
*
* @since 2.0.0
*/
- def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized {
+ def get(id: Long): StreamingQuery = activeQueriesLock.synchronized {
activeQueries.get(id).orNull
}
@@ -81,7 +81,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
- * @throws ContinuousQueryException, if any query has terminated with an exception
+ * @throws StreamingQueryException, if any query has terminated with an exception
*
* @since 2.0.0
*/
@@ -113,7 +113,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
- * @throws ContinuousQueryException, if any query has terminated with an exception
+ * @throws StreamingQueryException, if any query has terminated with an exception
*
* @since 2.0.0
*/
@@ -146,31 +146,31 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
}
/**
- * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of
- * [[ContinuousQuery]].
+ * Register a [[StreamingQueryListener]] to receive up-calls for life cycle events of
+ * [[StreamingQuery]].
*
* @since 2.0.0
*/
- def addListener(listener: ContinuousQueryListener): Unit = {
+ def addListener(listener: StreamingQueryListener): Unit = {
listenerBus.addListener(listener)
}
/**
- * Deregister a [[ContinuousQueryListener]].
+ * Deregister a [[StreamingQueryListener]].
*
* @since 2.0.0
*/
- def removeListener(listener: ContinuousQueryListener): Unit = {
+ def removeListener(listener: StreamingQueryListener): Unit = {
listenerBus.removeListener(listener)
}
/** Post a listener event */
- private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): Unit = {
+ private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = {
listenerBus.post(event)
}
/**
- * Start a [[ContinuousQuery]].
+ * Start a [[StreamingQuery]].
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param df Streaming DataFrame.
@@ -193,7 +193,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
useTempCheckpointLocation: Boolean = false,
recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
- triggerClock: Clock = new SystemClock()): ContinuousQuery = {
+ triggerClock: Clock = new SystemClock()): StreamingQuery = {
activeQueriesLock.synchronized {
val id = StreamExecution.nextId
val name = userSpecifiedName.getOrElse(s"query-$id")
@@ -264,8 +264,8 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
}
}
- /** Notify (by the ContinuousQuery) that the query has been terminated */
- private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = {
+ /** Notify (by the StreamingQuery) that the query has been terminated */
+ private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = {
activeQueriesLock.synchronized {
activeQueries -= terminatedQuery.id
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
index d3fdbac576..55be7a711a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
@@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
/**
* :: Experimental ::
- * Used to indicate how often results should be produced by a [[ContinuousQuery]].
+ * Used to indicate how often results should be produced by a [[StreamingQuery]].
*
* @since 2.0.0
*/
@@ -65,7 +65,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
/**
* :: Experimental ::
- * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s.
+ * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s.
*
* @since 2.0.0
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index a5acc970e3..9d0a2b3d5b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -124,7 +124,7 @@ class FileStreamSinkSuite extends StreamTest {
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
- var query: ContinuousQuery = null
+ var query: StreamingQuery = null
try {
query =
@@ -156,7 +156,7 @@ class FileStreamSinkSuite extends StreamTest {
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
- var query: ContinuousQuery = null
+ var query: StreamingQuery = null
try {
query =
@@ -240,7 +240,7 @@ class FileStreamSinkSuite extends StreamTest {
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
- var query: ContinuousQuery = null
+ var query: StreamingQuery = null
try {
val writer =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 0e157cf726..f9e236c449 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -57,7 +57,7 @@ class FileStressSuite extends StreamTest {
@volatile
var continue = true
@volatile
- var stream: ContinuousQuery = null
+ var stream: StreamingQuery = null
val writer = new Thread("stream writer") {
override def run(): Unit = {
@@ -100,7 +100,7 @@ class FileStressSuite extends StreamTest {
val input = spark.readStream.format("text").load(inputDir)
- def startStream(): ContinuousQuery = {
+ def startStream(): StreamingQuery = {
val output = input
.repartition(5)
.as[String]
@@ -139,7 +139,7 @@ class FileStressSuite extends StreamTest {
try {
stream.awaitTermination()
} catch {
- case ce: ContinuousQueryException =>
+ case ce: StreamingQueryException =>
failures += 1
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index cbfa6ff07d..720ffaf732 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -353,7 +353,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case ef: ExpectFailure[_] =>
verify(currentStream != null, "can not expect failure when stream is not running")
try failAfter(streamingTimeout) {
- val thrownException = intercept[ContinuousQueryException] {
+ val thrownException = intercept[StreamingQueryException] {
currentStream.awaitTermination()
}
eventually("microbatch thread not stopped after termination with failure") {
@@ -563,7 +563,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case e: ExpectException[_] =>
val thrownException =
withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") {
- intercept[ContinuousQueryException] {
+ intercept[StreamingQueryException] {
failAfter(testTimeout) {
awaitTermFunc()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 7f44227ec4..8681199817 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -40,8 +40,6 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
import testImplicits._
-
-
test("simple count, update mode") {
val inputData = MemoryStream[Int]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 8e1de09a96..7f4d28cf05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -31,10 +31,10 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.util.JsonProtocol
-class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
+class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
- import ContinuousQueryListener._
+ import StreamingQueryListener._
after {
spark.streams.active.foreach(_.stop())
@@ -167,35 +167,35 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
test("QueryStarted serialization") {
- val queryStartedInfo = new ContinuousQueryInfo(
+ val queryStartedInfo = new StreamingQueryInfo(
"name",
1,
Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)),
new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
- val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo)
+ val queryStarted = new StreamingQueryListener.QueryStarted(queryStartedInfo)
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
- .asInstanceOf[ContinuousQueryListener.QueryStarted]
- assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo)
+ .asInstanceOf[StreamingQueryListener.QueryStarted]
+ assertStreamingQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo)
}
test("QueryProgress serialization") {
- val queryProcessInfo = new ContinuousQueryInfo(
+ val queryProcessInfo = new StreamingQueryInfo(
"name",
1,
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString))
- val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo)
+ val queryProcess = new StreamingQueryListener.QueryProgress(queryProcessInfo)
val json = JsonProtocol.sparkEventToJson(queryProcess)
val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
- .asInstanceOf[ContinuousQueryListener.QueryProgress]
- assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo)
+ .asInstanceOf[StreamingQueryListener.QueryProgress]
+ assertStreamingQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo)
}
test("QueryTerminated serialization") {
- val queryTerminatedInfo = new ContinuousQueryInfo(
+ val queryTerminatedInfo = new StreamingQueryInfo(
"name",
1,
Seq(
@@ -203,21 +203,21 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
new SourceStatus("source2", Some(LongOffset(1).toString))),
new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString))
val exception = new RuntimeException("exception")
- val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated(
+ val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
queryTerminatedInfo,
Some(exception.getMessage),
exception.getStackTrace)
val json =
JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
- .asInstanceOf[ContinuousQueryListener.QueryTerminated]
- assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo)
+ .asInstanceOf[StreamingQueryListener.QueryTerminated]
+ assertStreamingQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
- private def assertContinuousQueryInfoEquals(
- expected: ContinuousQueryInfo,
- actual: ContinuousQueryInfo): Unit = {
+ private def assertStreamingQueryInfoEquals(
+ expected: StreamingQueryInfo,
+ actual: StreamingQueryInfo): Unit = {
assert(expected.name === actual.name)
assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
@@ -237,7 +237,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(expected.offsetDesc === actual.offsetDesc)
}
- private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
+ private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = {
try {
failAfter(1 minute) {
spark.streams.addListener(listener)
@@ -248,23 +248,23 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
- private def addedListeners(): Array[ContinuousQueryListener] = {
+ private def addedListeners(): Array[StreamingQueryListener] = {
val listenerBusMethod =
- PrivateMethod[ContinuousQueryListenerBus]('listenerBus)
+ PrivateMethod[StreamingQueryListenerBus]('listenerBus)
val listenerBus = spark.streams invokePrivate listenerBusMethod()
- listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener])
+ listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
}
- class QueryStatusCollector extends ContinuousQueryListener {
+ class QueryStatusCollector extends StreamingQueryListener {
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter
- @volatile var startStatus: ContinuousQueryInfo = null
- @volatile var terminationStatus: ContinuousQueryInfo = null
+ @volatile var startStatus: StreamingQueryInfo = null
+ @volatile var terminationStatus: StreamingQueryInfo = null
@volatile var terminationException: Option[String] = null
@volatile var terminationStackTrace: Seq[StackTraceElement] = null
- val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]
+ val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]
def reset(): Unit = {
startStatus = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index ef2fcbf73e..41ffd56cf1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.util.Utils
-class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
+class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
import AwaitTerminationTester._
import testImplicits._
@@ -215,7 +215,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
/** Run a body of code by defining a query on each dataset */
- private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = {
+ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
failAfter(streamingTimeout) {
val queries = withClue("Error starting queries") {
datasets.zipWithIndex.map { case (ds, i) =>
@@ -269,7 +269,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
/** Stop a random active query either with `stop()` or with an error */
- private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): ContinuousQuery = {
+ private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): StreamingQuery = {
import scala.concurrent.ExecutionContext.Implicits.global
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index ad6bc27729..9d58315c20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, Me
import org.apache.spark.util.Utils
-class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
+class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
import AwaitTerminationTester._
import testImplicits._
@@ -37,7 +37,7 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}
- def startQuery(queryName: String): ContinuousQuery = {
+ def startQuery(queryName: String): StreamingQuery = {
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
val writer = mapped.writeStream
writer
@@ -126,7 +126,7 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
}
/**
- * A [[StreamAction]] to test the behavior of `ContinuousQuery.awaitTermination()`.
+ * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
*
* @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown)
* @param timeoutMs Timeout in milliseconds
@@ -151,7 +151,7 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
object TestAwaitTermination {
/**
- * Tests the behavior of `ContinuousQuery.awaitTermination`.
+ * Tests the behavior of `StreamingQuery.awaitTermination`.
*
* @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown)
* @param timeoutMs Timeout in milliseconds
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index c6d374f754..1aee1934c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
@@ -265,7 +265,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
test("unique query names") {
/** Start a query with a specific name */
- def startQueryWithName(name: String = ""): ContinuousQuery = {
+ def startQueryWithName(name: String = ""): StreamingQuery = {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load("/test")
@@ -277,7 +277,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
/** Start a query without specifying a name */
- def startQueryWithoutName(): ContinuousQuery = {
+ def startQueryWithoutName(): StreamingQuery = {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load("/test")
@@ -434,13 +434,13 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.format("org.apache.spark.sql.streaming.test")
.load()
- val cq = df.writeStream
+ val sq = df.writeStream
.format("console")
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime(2.seconds))
.start()
- cq.awaitTermination(2000L)
+ sq.awaitTermination(2000L)
}
test("prevent all column partitioning") {