diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-04-20 10:32:01 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-04-20 10:32:01 -0700 |
commit | 80bf48f437939ddc3bb82c8c7530c8ae419f8427 (patch) | |
tree | e9be7bd9acac75d677eaad8ba69c84890d4913d3 /python/pyspark/sql/readwriter.py | |
parent | 834277884fcdaab4758604272881ffb2369e25f0 (diff) | |
download | spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.gz spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.tar.bz2 spark-80bf48f437939ddc3bb82c8c7530c8ae419f8427.zip |
[SPARK-14555] First cut of Python API for Structured Streaming
## What changes were proposed in this pull request?
This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes:
- ContinuousQuery
- Trigger
- ProcessingTime
in pyspark under `pyspark.sql.streaming`.
In addition, it contains the new methods added under:
- `DataFrameWriter`
a) `startStream`
b) `trigger`
c) `queryName`
- `DataFrameReader`
a) `stream`
- `DataFrame`
a) `isStreaming`
This PR doesn't contain all methods exposed for `ContinuousQuery`, for example:
- `exception`
- `sourceStatuses`
- `sinkStatus`
They may be added in a follow up.
This PR also contains some very minor doc fixes in the Scala side.
## How was this patch tested?
Python doc tests
TODO:
- [ ] verify Python docs look good
Author: Burak Yavuz <brkyvz@gmail.com>
Author: Burak Yavuz <burak@databricks.com>
Closes #12320 from brkyvz/stream-python.
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 121 |
1 files changed, 120 insertions, 1 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 0cef37e57c..6c809d1139 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -22,7 +22,7 @@ if sys.version >= '3': from py4j.java_gateway import JavaClass -from pyspark import RDD, since +from pyspark import RDD, since, keyword_only from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.column import _to_seq from pyspark.sql.types import * @@ -136,6 +136,32 @@ class DataFrameReader(object): else: return self._df(self._jreader.load()) + @since(2.0) + def stream(self, path=None, format=None, schema=None, **options): + """Loads a data stream from a data source and returns it as a :class`DataFrame`. + + :param path: optional string for file-system backed data sources. + :param format: optional string for format of the data source. Default to 'parquet'. + :param schema: optional :class:`StructType` for the input schema. + :param options: all other string options + + >>> df = sqlContext.read.format('text').stream('python/test_support/sql/streaming') + >>> df.isStreaming + True + """ + if format is not None: + self.format(format) + if schema is not None: + self.schema(schema) + self.options(**options) + if path is not None: + if type(path) != str or len(path.strip()) == 0: + raise ValueError("If the path is provided for stream, it needs to be a " + + "non-empty string. List of paths are not supported.") + return self._df(self._jreader.stream(path)) + else: + return self._df(self._jreader.stream()) + @since(1.4) def json(self, path, schema=None): """ @@ -334,6 +360,10 @@ class DataFrameWriter(object): self._sqlContext = df.sql_ctx self._jwrite = df._jdf.write() + def _cq(self, jcq): + from pyspark.sql.streaming import ContinuousQuery + return ContinuousQuery(jcq, self._sqlContext) + @since(1.4) def mode(self, saveMode): """Specifies the behavior when data or table already exists. @@ -395,6 +425,44 @@ class DataFrameWriter(object): self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) return self + @since(2.0) + def queryName(self, queryName): + """Specifies the name of the :class:`ContinuousQuery` that can be started with + :func:`startStream`. This name must be unique among all the currently active queries + in the associated SQLContext. + + :param queryName: unique name for the query + + >>> writer = sdf.write.queryName('streaming_query') + """ + if not queryName or type(queryName) != str or len(queryName.strip()) == 0: + raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName) + self._jwrite = self._jwrite.queryName(queryName) + return self + + @keyword_only + @since(2.0) + def trigger(self, processingTime=None): + """Set the trigger for the stream query. If this is not set it will run the query as fast + as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. + + :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'. + + >>> # trigger the query for execution every 5 seconds + >>> writer = sdf.write.trigger(processingTime='5 seconds') + """ + from pyspark.sql.streaming import ProcessingTime + trigger = None + if processingTime is not None: + if type(processingTime) != str or len(processingTime.strip()) == 0: + raise ValueError('The processing time must be a non empty string. Got: %s' % + processingTime) + trigger = ProcessingTime(processingTime) + if trigger is None: + raise ValueError('A trigger was not provided. Supported triggers: processingTime.') + self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._sqlContext)) + return self + @since(1.4) def save(self, path=None, format=None, mode=None, partitionBy=None, **options): """Saves the contents of the :class:`DataFrame` to a data source. @@ -426,6 +494,55 @@ class DataFrameWriter(object): else: self._jwrite.save(path) + @ignore_unicode_prefix + @since(2.0) + def startStream(self, path=None, format=None, partitionBy=None, queryName=None, **options): + """Streams the contents of the :class:`DataFrame` to a data source. + + The data source is specified by the ``format`` and a set of ``options``. + If ``format`` is not specified, the default data source configured by + ``spark.sql.sources.default`` will be used. + + :param path: the path in a Hadoop supported file system + :param format: the format used to save + + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. + :param partitionBy: names of partitioning columns + :param queryName: unique name for the query + :param options: All other string options. You may want to provide a `checkpointLocation` + for most streams, however it is not required for a `memory` stream. + + >>> cq = sdf.write.format('memory').queryName('this_query').startStream() + >>> cq.isActive + True + >>> cq.name + u'this_query' + >>> cq.stop() + >>> cq.isActive + False + >>> cq = sdf.write.trigger(processingTime='5 seconds').startStream( + ... queryName='that_query', format='memory') + >>> cq.name + u'that_query' + >>> cq.isActive + True + >>> cq.stop() + """ + self.options(**options) + if partitionBy is not None: + self.partitionBy(partitionBy) + if format is not None: + self.format(format) + if queryName is not None: + self.queryName(queryName) + if path is None: + return self._cq(self._jwrite.startStream()) + else: + return self._cq(self._jwrite.startStream(path)) + @since(1.4) def insertInto(self, tableName, overwrite=False): """Inserts the content of the :class:`DataFrame` to the specified table. @@ -625,6 +742,8 @@ def _test(): globs['sqlContext'] = SQLContext(sc) globs['hiveContext'] = HiveContext(sc) globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned') + globs['sdf'] =\ + globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, |