aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/readwriter.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r--python/pyspark/sql/readwriter.py40
1 files changed, 20 insertions, 20 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index ad954d0ad8..c982de6840 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
from pyspark.sql import utils
-__all__ = ["DataFrameReader", "DataFrameWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"]
def to_str(value):
@@ -458,9 +458,9 @@ class DataFrameWriter(object):
self._spark = df.sql_ctx
self._jwrite = df._jdf.write()
- def _cq(self, jcq):
- from pyspark.sql.streaming import ContinuousQuery
- return ContinuousQuery(jcq)
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
@since(1.4)
def mode(self, saveMode):
@@ -1094,9 +1094,9 @@ class DataStreamWriter(object):
self._spark = df.sql_ctx
self._jwrite = df._jdf.writeStream()
- def _cq(self, jcq):
- from pyspark.sql.streaming import ContinuousQuery
- return ContinuousQuery(jcq)
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
@since(2.0)
def outputMode(self, outputMode):
@@ -1169,8 +1169,8 @@ class DataStreamWriter(object):
@since(2.0)
def queryName(self, queryName):
- """Specifies the name of the :class:`ContinuousQuery` that can be started with
- :func:`startStream`. This name must be unique among all the currently active queries
+ """Specifies the name of the :class:`StreamingQuery` that can be started with
+ :func:`start`. This name must be unique among all the currently active queries
in the associated SparkSession.
.. note:: Experimental.
@@ -1232,21 +1232,21 @@ class DataStreamWriter(object):
:param options: All other string options. You may want to provide a `checkpointLocation`
for most streams, however it is not required for a `memory` stream.
- >>> cq = sdf.writeStream.format('memory').queryName('this_query').start()
- >>> cq.isActive
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.isActive
True
- >>> cq.name
+ >>> sq.name
u'this_query'
- >>> cq.stop()
- >>> cq.isActive
+ >>> sq.stop()
+ >>> sq.isActive
False
- >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+ >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
... queryName='that_query', format='memory')
- >>> cq.name
+ >>> sq.name
u'that_query'
- >>> cq.isActive
+ >>> sq.isActive
True
- >>> cq.stop()
+ >>> sq.stop()
"""
self.options(**options)
if partitionBy is not None:
@@ -1256,9 +1256,9 @@ class DataStreamWriter(object):
if queryName is not None:
self.queryName(queryName)
if path is None:
- return self._cq(self._jwrite.start())
+ return self._sq(self._jwrite.start())
else:
- return self._cq(self._jwrite.start(path))
+ return self._sq(self._jwrite.start(path))
def _test():