diff options
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ad954d0ad8..c982de6840 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq from pyspark.sql.types import * from pyspark.sql import utils -__all__ = ["DataFrameReader", "DataFrameWriter"] +__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"] def to_str(value): @@ -458,9 +458,9 @@ class DataFrameWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.write() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(1.4) def mode(self, saveMode): @@ -1094,9 +1094,9 @@ class DataStreamWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.writeStream() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(2.0) def outputMode(self, outputMode): @@ -1169,8 +1169,8 @@ class DataStreamWriter(object): @since(2.0) def queryName(self, queryName): - """Specifies the name of the :class:`ContinuousQuery` that can be started with - :func:`startStream`. This name must be unique among all the currently active queries + """Specifies the name of the :class:`StreamingQuery` that can be started with + :func:`start`. This name must be unique among all the currently active queries in the associated SparkSession. .. note:: Experimental. @@ -1232,21 +1232,21 @@ class DataStreamWriter(object): :param options: All other string options. You may want to provide a `checkpointLocation` for most streams, however it is not required for a `memory` stream. - >>> cq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> cq.isActive + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive True - >>> cq.name + >>> sq.name u'this_query' - >>> cq.stop() - >>> cq.isActive + >>> sq.stop() + >>> sq.isActive False - >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start( + >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( ... queryName='that_query', format='memory') - >>> cq.name + >>> sq.name u'that_query' - >>> cq.isActive + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ self.options(**options) if partitionBy is not None: @@ -1256,9 +1256,9 @@ class DataStreamWriter(object): if queryName is not None: self.queryName(queryName) if path is None: - return self._cq(self._jwrite.start()) + return self._sq(self._jwrite.start()) else: - return self._cq(self._jwrite.start(path)) + return self._sq(self._jwrite.start(path)) def _test(): |