aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/readwriter.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r--python/pyspark/sql/readwriter.py121
1 files changed, 120 insertions, 1 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 0cef37e57c..6c809d1139 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -22,7 +22,7 @@ if sys.version >= '3':
from py4j.java_gateway import JavaClass
-from pyspark import RDD, since
+from pyspark import RDD, since, keyword_only
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
@@ -136,6 +136,32 @@ class DataFrameReader(object):
else:
return self._df(self._jreader.load())
+ @since(2.0)
+ def stream(self, path=None, format=None, schema=None, **options):
+ """Loads a data stream from a data source and returns it as a :class`DataFrame`.
+
+ :param path: optional string for file-system backed data sources.
+ :param format: optional string for format of the data source. Default to 'parquet'.
+ :param schema: optional :class:`StructType` for the input schema.
+ :param options: all other string options
+
+ >>> df = sqlContext.read.format('text').stream('python/test_support/sql/streaming')
+ >>> df.isStreaming
+ True
+ """
+ if format is not None:
+ self.format(format)
+ if schema is not None:
+ self.schema(schema)
+ self.options(**options)
+ if path is not None:
+ if type(path) != str or len(path.strip()) == 0:
+ raise ValueError("If the path is provided for stream, it needs to be a " +
+ "non-empty string. List of paths are not supported.")
+ return self._df(self._jreader.stream(path))
+ else:
+ return self._df(self._jreader.stream())
+
@since(1.4)
def json(self, path, schema=None):
"""
@@ -334,6 +360,10 @@ class DataFrameWriter(object):
self._sqlContext = df.sql_ctx
self._jwrite = df._jdf.write()
+ def _cq(self, jcq):
+ from pyspark.sql.streaming import ContinuousQuery
+ return ContinuousQuery(jcq, self._sqlContext)
+
@since(1.4)
def mode(self, saveMode):
"""Specifies the behavior when data or table already exists.
@@ -395,6 +425,44 @@ class DataFrameWriter(object):
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
return self
+ @since(2.0)
+ def queryName(self, queryName):
+ """Specifies the name of the :class:`ContinuousQuery` that can be started with
+ :func:`startStream`. This name must be unique among all the currently active queries
+ in the associated SQLContext.
+
+ :param queryName: unique name for the query
+
+ >>> writer = sdf.write.queryName('streaming_query')
+ """
+ if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
+ raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
+ self._jwrite = self._jwrite.queryName(queryName)
+ return self
+
+ @keyword_only
+ @since(2.0)
+ def trigger(self, processingTime=None):
+ """Set the trigger for the stream query. If this is not set it will run the query as fast
+ as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
+
+ :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
+
+ >>> # trigger the query for execution every 5 seconds
+ >>> writer = sdf.write.trigger(processingTime='5 seconds')
+ """
+ from pyspark.sql.streaming import ProcessingTime
+ trigger = None
+ if processingTime is not None:
+ if type(processingTime) != str or len(processingTime.strip()) == 0:
+ raise ValueError('The processing time must be a non empty string. Got: %s' %
+ processingTime)
+ trigger = ProcessingTime(processingTime)
+ if trigger is None:
+ raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
+ self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._sqlContext))
+ return self
+
@since(1.4)
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
@@ -426,6 +494,55 @@ class DataFrameWriter(object):
else:
self._jwrite.save(path)
+ @ignore_unicode_prefix
+ @since(2.0)
+ def startStream(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+ """Streams the contents of the :class:`DataFrame` to a data source.
+
+ The data source is specified by the ``format`` and a set of ``options``.
+ If ``format`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ :param path: the path in a Hadoop supported file system
+ :param format: the format used to save
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
+ :param partitionBy: names of partitioning columns
+ :param queryName: unique name for the query
+ :param options: All other string options. You may want to provide a `checkpointLocation`
+ for most streams, however it is not required for a `memory` stream.
+
+ >>> cq = sdf.write.format('memory').queryName('this_query').startStream()
+ >>> cq.isActive
+ True
+ >>> cq.name
+ u'this_query'
+ >>> cq.stop()
+ >>> cq.isActive
+ False
+ >>> cq = sdf.write.trigger(processingTime='5 seconds').startStream(
+ ... queryName='that_query', format='memory')
+ >>> cq.name
+ u'that_query'
+ >>> cq.isActive
+ True
+ >>> cq.stop()
+ """
+ self.options(**options)
+ if partitionBy is not None:
+ self.partitionBy(partitionBy)
+ if format is not None:
+ self.format(format)
+ if queryName is not None:
+ self.queryName(queryName)
+ if path is None:
+ return self._cq(self._jwrite.startStream())
+ else:
+ return self._cq(self._jwrite.startStream(path))
+
@since(1.4)
def insertInto(self, tableName, overwrite=False):
"""Inserts the content of the :class:`DataFrame` to the specified table.
@@ -625,6 +742,8 @@ def _test():
globs['sqlContext'] = SQLContext(sc)
globs['hiveContext'] = HiveContext(sc)
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
+ globs['sdf'] =\
+ globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,