aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/context.py13
-rw-r--r--python/pyspark/sql/dataframe.py18
-rw-r--r--python/pyspark/sql/readwriter.py640
-rw-r--r--python/pyspark/sql/session.py17
-rw-r--r--python/pyspark/sql/streaming.py8
-rw-r--r--python/pyspark/sql/tests.py42
6 files changed, 570 insertions, 168 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 60f62b219b..a271afe4cf 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -430,6 +430,19 @@ class SQLContext(object):
@property
@since(2.0)
+ def readStream(self):
+ """
+ Returns a :class:`DataStreamReader` that can be used to read data streams
+ as a streaming :class:`DataFrame`.
+
+ .. note:: Experimental.
+
+ :return: :class:`DataStreamReader`
+ """
+ return DataStreamReader(self._wrapped)
+
+ @property
+ @since(2.0)
def streams(self):
"""Returns a :class:`ContinuousQueryManager` that allows managing all the
:class:`ContinuousQuery` ContinuousQueries active on `this` context.
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 4fa799ac55..0126faf574 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -33,7 +33,7 @@ from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
-from pyspark.sql.readwriter import DataFrameWriter
+from pyspark.sql.readwriter import DataFrameWriter, DataStreamWriter
from pyspark.sql.types import *
__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
@@ -172,13 +172,27 @@ class DataFrame(object):
@since(1.4)
def write(self):
"""
- Interface for saving the content of the :class:`DataFrame` out into external storage.
+ Interface for saving the content of the non-streaming :class:`DataFrame` out into external
+ storage.
:return: :class:`DataFrameWriter`
"""
return DataFrameWriter(self)
@property
+ @since(2.0)
+ def writeStream(self):
+ """
+ Interface for saving the content of the streaming :class:`DataFrame` out into external
+ storage.
+
+ .. note:: Experimental.
+
+ :return: :class:`DataStreamWriter`
+ """
+ return DataStreamWriter(self)
+
+ @property
@since(1.3)
def schema(self):
"""Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`.
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 0f50f672a2..ad954d0ad8 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -137,34 +137,6 @@ class DataFrameReader(object):
else:
return self._df(self._jreader.load())
- @since(2.0)
- def stream(self, path=None, format=None, schema=None, **options):
- """Loads a data stream from a data source and returns it as a :class`DataFrame`.
-
- .. note:: Experimental.
-
- :param path: optional string for file-system backed data sources.
- :param format: optional string for format of the data source. Default to 'parquet'.
- :param schema: optional :class:`StructType` for the input schema.
- :param options: all other string options
-
- >>> df = spark.read.format('text').stream('python/test_support/sql/streaming')
- >>> df.isStreaming
- True
- """
- if format is not None:
- self.format(format)
- if schema is not None:
- self.schema(schema)
- self.options(**options)
- if path is not None:
- if type(path) != str or len(path.strip()) == 0:
- raise ValueError("If the path is provided for stream, it needs to be a " +
- "non-empty string. List of paths are not supported.")
- return self._df(self._jreader.stream(path))
- else:
- return self._df(self._jreader.stream())
-
@since(1.4)
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
@@ -509,26 +481,6 @@ class DataFrameWriter(object):
self._jwrite = self._jwrite.mode(saveMode)
return self
- @since(2.0)
- def outputMode(self, outputMode):
- """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
-
- Options include:
-
- * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
- the sink
- * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
- every time these is some updates
-
- .. note:: Experimental.
-
- >>> writer = sdf.write.outputMode('append')
- """
- if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
- raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
- self._jwrite = self._jwrite.outputMode(outputMode)
- return self
-
@since(1.4)
def format(self, source):
"""Specifies the underlying output data source.
@@ -571,48 +523,6 @@ class DataFrameWriter(object):
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
return self
- @since(2.0)
- def queryName(self, queryName):
- """Specifies the name of the :class:`ContinuousQuery` that can be started with
- :func:`startStream`. This name must be unique among all the currently active queries
- in the associated SparkSession.
-
- .. note:: Experimental.
-
- :param queryName: unique name for the query
-
- >>> writer = sdf.write.queryName('streaming_query')
- """
- if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
- raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
- self._jwrite = self._jwrite.queryName(queryName)
- return self
-
- @keyword_only
- @since(2.0)
- def trigger(self, processingTime=None):
- """Set the trigger for the stream query. If this is not set it will run the query as fast
- as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
-
- .. note:: Experimental.
-
- :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
-
- >>> # trigger the query for execution every 5 seconds
- >>> writer = sdf.write.trigger(processingTime='5 seconds')
- """
- from pyspark.sql.streaming import ProcessingTime
- trigger = None
- if processingTime is not None:
- if type(processingTime) != str or len(processingTime.strip()) == 0:
- raise ValueError('The processing time must be a non empty string. Got: %s' %
- processingTime)
- trigger = ProcessingTime(processingTime)
- if trigger is None:
- raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
- self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
- return self
-
@since(1.4)
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
@@ -644,57 +554,6 @@ class DataFrameWriter(object):
else:
self._jwrite.save(path)
- @ignore_unicode_prefix
- @since(2.0)
- def startStream(self, path=None, format=None, partitionBy=None, queryName=None, **options):
- """Streams the contents of the :class:`DataFrame` to a data source.
-
- The data source is specified by the ``format`` and a set of ``options``.
- If ``format`` is not specified, the default data source configured by
- ``spark.sql.sources.default`` will be used.
-
- .. note:: Experimental.
-
- :param path: the path in a Hadoop supported file system
- :param format: the format used to save
-
- * ``append``: Append contents of this :class:`DataFrame` to existing data.
- * ``overwrite``: Overwrite existing data.
- * ``ignore``: Silently ignore this operation if data already exists.
- * ``error`` (default case): Throw an exception if data already exists.
- :param partitionBy: names of partitioning columns
- :param queryName: unique name for the query
- :param options: All other string options. You may want to provide a `checkpointLocation`
- for most streams, however it is not required for a `memory` stream.
-
- >>> cq = sdf.write.format('memory').queryName('this_query').startStream()
- >>> cq.isActive
- True
- >>> cq.name
- u'this_query'
- >>> cq.stop()
- >>> cq.isActive
- False
- >>> cq = sdf.write.trigger(processingTime='5 seconds').startStream(
- ... queryName='that_query', format='memory')
- >>> cq.name
- u'that_query'
- >>> cq.isActive
- True
- >>> cq.stop()
- """
- self.options(**options)
- if partitionBy is not None:
- self.partitionBy(partitionBy)
- if format is not None:
- self.format(format)
- if queryName is not None:
- self.queryName(queryName)
- if path is None:
- return self._cq(self._jwrite.startStream())
- else:
- return self._cq(self._jwrite.startStream(path))
-
@since(1.4)
def insertInto(self, tableName, overwrite=False):
"""Inserts the content of the :class:`DataFrame` to the specified table.
@@ -905,6 +764,503 @@ class DataFrameWriter(object):
self._jwrite.mode(mode).jdbc(url, table, jprop)
+class DataStreamReader(object):
+ """
+ Interface used to load a streaming :class:`DataFrame` from external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, spark):
+ self._jreader = spark._ssql_ctx.readStream()
+ self._spark = spark
+
+ def _df(self, jdf):
+ from pyspark.sql.dataframe import DataFrame
+ return DataFrame(jdf, self._spark)
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the input data source format.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ """
+ self._jreader = self._jreader.format(source)
+ return self
+
+ @since(2.0)
+ def schema(self, schema):
+ """Specifies the input schema.
+
+ Some data sources (e.g. JSON) can infer the input schema automatically from data.
+ By specifying the schema here, the underlying data source can skip the schema
+ inference step, and thus speed up data loading.
+
+ .. note:: Experimental.
+
+ :param schema: a StructType object
+ """
+ if not isinstance(schema, StructType):
+ raise TypeError("schema should be StructType")
+ jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+ self._jreader = self._jreader.schema(jschema)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an input option for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ self._jreader = self._jreader.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds input options for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ for k in options:
+ self._jreader = self._jreader.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def load(self, path=None, format=None, schema=None, **options):
+ """Loads a data stream from a data source and returns it as a :class`DataFrame`.
+
+ .. note:: Experimental.
+
+ :param path: optional string for file-system backed data sources.
+ :param format: optional string for format of the data source. Default to 'parquet'.
+ :param schema: optional :class:`StructType` for the input schema.
+ :param options: all other string options
+
+ """
+ if format is not None:
+ self.format(format)
+ if schema is not None:
+ self.schema(schema)
+ self.options(**options)
+ if path is not None:
+ if type(path) != str or len(path.strip()) == 0:
+ raise ValueError("If the path is provided for stream, it needs to be a " +
+ "non-empty string. List of paths are not supported.")
+ return self._df(self._jreader.load(path))
+ else:
+ return self._df(self._jreader.load())
+
+ @since(2.0)
+ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
+ allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
+ """
+ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
+
+ If the ``schema`` parameter is not specified, this function goes
+ through the input once to determine the input schema.
+
+ .. note:: Experimental.
+
+ :param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param primitivesAsString: infers all primitive values as a string type. If None is set,
+ it uses the default value, ``false``.
+ :param prefersDecimal: infers all floating-point values as a decimal type. If the values
+ do not fit in decimal, then it infers them as doubles. If None is
+ set, it uses the default value, ``false``.
+ :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
+ it uses the default value, ``false``.
+ :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
+ it uses the default value, ``false``.
+ :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
+ set, it uses the default value, ``true``.
+ :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
+ set, it uses the default value, ``false``.
+ :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
+ using backslash quoting mechanism. If None is
+ set, it uses the default value, ``false``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record and puts the malformed string into a new field configured by \
+ ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
+ ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets corrupted records.
+
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
+ """
+ if schema is not None:
+ self.schema(schema)
+ if primitivesAsString is not None:
+ self.option("primitivesAsString", primitivesAsString)
+ if prefersDecimal is not None:
+ self.option("prefersDecimal", prefersDecimal)
+ if allowComments is not None:
+ self.option("allowComments", allowComments)
+ if allowUnquotedFieldNames is not None:
+ self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+ if allowSingleQuotes is not None:
+ self.option("allowSingleQuotes", allowSingleQuotes)
+ if allowNumericLeadingZero is not None:
+ self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+ if allowBackslashEscapingAnyCharacter is not None:
+ self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter)
+ if mode is not None:
+ self.option("mode", mode)
+ if columnNameOfCorruptRecord is not None:
+ self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.json(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def parquet(self, path):
+ """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
+
+ You can set the following Parquet-specific option(s) for reading Parquet files:
+ * ``mergeSchema``: sets whether we should merge schemas collected from all \
+ Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
+ The default value is specified in ``spark.sql.parquet.mergeSchema``.
+
+ .. note:: Experimental.
+
+ """
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.parquet(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def text(self, path):
+ """
+ Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
+ string column named "value", and followed by partitioned columns if there
+ are any.
+
+ Each line in the text file is a new row in the resulting DataFrame.
+
+ .. note:: Experimental.
+
+ :param paths: string, or list of strings, for input path(s).
+
+ """
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
+ comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
+ ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+ """Loads a CSV file stream and returns the result as a :class:`DataFrame`.
+
+ This function will go through the input once to determine the input schema if
+ ``inferSchema`` is enabled. To avoid going through the entire data once, disable
+ ``inferSchema`` option or specify the schema explicitly using ``schema``.
+
+ .. note:: Experimental.
+
+ :param path: string, or list of strings, for input path(s).
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param sep: sets the single character as a separator for each field and value.
+ If None is set, it uses the default value, ``,``.
+ :param encoding: decodes the CSV files by the given encoding type. If None is set,
+ it uses the default value, ``UTF-8``.
+ :param quote: sets the single character used for escaping quoted values where the
+ separator can be part of the value. If None is set, it uses the default
+ value, ``"``. If you would like to turn off quotations, you need to set an
+ empty string.
+ :param escape: sets the single character used for escaping quotes inside an already
+ quoted value. If None is set, it uses the default value, ``\``.
+ :param comment: sets the single character used for skipping lines beginning with this
+ character. By default (None), it is disabled.
+ :param header: uses the first line as names of columns. If None is set, it uses the
+ default value, ``false``.
+ :param inferSchema: infers the input schema automatically from data. It requires one extra
+ pass over the data. If None is set, it uses the default value, ``false``.
+ :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param nullValue: sets the string representation of a null value. If None is set, it uses
+ the default value, empty string.
+ :param nanValue: sets the string representation of a non-number value. If None is set, it
+ uses the default value, ``NaN``.
+ :param positiveInf: sets the string representation of a positive infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param negativeInf: sets the string representation of a negative infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to both date type and timestamp type. By default, it is None
+ which means trying to parse times and date by
+ ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ :param maxColumns: defines a hard limit of how many columns a record can have. If None is
+ set, it uses the default value, ``20480``.
+ :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
+ value being read. If None is set, it uses the default value,
+ ``1000000``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
+ When a schema is set by user, it sets ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets corrupted records.
+
+ """
+ if schema is not None:
+ self.schema(schema)
+ if sep is not None:
+ self.option("sep", sep)
+ if encoding is not None:
+ self.option("encoding", encoding)
+ if quote is not None:
+ self.option("quote", quote)
+ if escape is not None:
+ self.option("escape", escape)
+ if comment is not None:
+ self.option("comment", comment)
+ if header is not None:
+ self.option("header", header)
+ if inferSchema is not None:
+ self.option("inferSchema", inferSchema)
+ if ignoreLeadingWhiteSpace is not None:
+ self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
+ if ignoreTrailingWhiteSpace is not None:
+ self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
+ if nullValue is not None:
+ self.option("nullValue", nullValue)
+ if nanValue is not None:
+ self.option("nanValue", nanValue)
+ if positiveInf is not None:
+ self.option("positiveInf", positiveInf)
+ if negativeInf is not None:
+ self.option("negativeInf", negativeInf)
+ if dateFormat is not None:
+ self.option("dateFormat", dateFormat)
+ if maxColumns is not None:
+ self.option("maxColumns", maxColumns)
+ if maxCharsPerColumn is not None:
+ self.option("maxCharsPerColumn", maxCharsPerColumn)
+ if mode is not None:
+ self.option("mode", mode)
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+
+class DataStreamWriter(object):
+ """
+ Interface used to write a streaming :class:`DataFrame` to external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, df):
+ self._df = df
+ self._spark = df.sql_ctx
+ self._jwrite = df._jdf.writeStream()
+
+ def _cq(self, jcq):
+ from pyspark.sql.streaming import ContinuousQuery
+ return ContinuousQuery(jcq)
+
+ @since(2.0)
+ def outputMode(self, outputMode):
+ """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+
+ Options include:
+
+ * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
+ the sink
+ * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
+ every time these is some updates
+
+ .. note:: Experimental.
+
+ >>> writer = sdf.writeStream.outputMode('append')
+ """
+ if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
+ raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
+ self._jwrite = self._jwrite.outputMode(outputMode)
+ return self
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the underlying output data source.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ >>> writer = sdf.writeStream.format('json')
+ """
+ self._jwrite = self._jwrite.format(source)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an output option for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ self._jwrite = self._jwrite.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds output options for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ for k in options:
+ self._jwrite = self._jwrite.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def partitionBy(self, *cols):
+ """Partitions the output by the given columns on the file system.
+
+ If specified, the output is laid out on the file system similar
+ to Hive's partitioning scheme.
+
+ .. note:: Experimental.
+
+ :param cols: name of columns
+
+ """
+ if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+ cols = cols[0]
+ self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
+ return self
+
+ @since(2.0)
+ def queryName(self, queryName):
+ """Specifies the name of the :class:`ContinuousQuery` that can be started with
+ :func:`startStream`. This name must be unique among all the currently active queries
+ in the associated SparkSession.
+
+ .. note:: Experimental.
+
+ :param queryName: unique name for the query
+
+ >>> writer = sdf.writeStream.queryName('streaming_query')
+ """
+ if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
+ raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
+ self._jwrite = self._jwrite.queryName(queryName)
+ return self
+
+ @keyword_only
+ @since(2.0)
+ def trigger(self, processingTime=None):
+ """Set the trigger for the stream query. If this is not set it will run the query as fast
+ as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
+
+ .. note:: Experimental.
+
+ :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
+
+ >>> # trigger the query for execution every 5 seconds
+ >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
+ """
+ from pyspark.sql.streaming import ProcessingTime
+ trigger = None
+ if processingTime is not None:
+ if type(processingTime) != str or len(processingTime.strip()) == 0:
+ raise ValueError('The processing time must be a non empty string. Got: %s' %
+ processingTime)
+ trigger = ProcessingTime(processingTime)
+ if trigger is None:
+ raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
+ self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
+ return self
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+ """Streams the contents of the :class:`DataFrame` to a data source.
+
+ The data source is specified by the ``format`` and a set of ``options``.
+ If ``format`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ .. note:: Experimental.
+
+ :param path: the path in a Hadoop supported file system
+ :param format: the format used to save
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
+ :param partitionBy: names of partitioning columns
+ :param queryName: unique name for the query
+ :param options: All other string options. You may want to provide a `checkpointLocation`
+ for most streams, however it is not required for a `memory` stream.
+
+ >>> cq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> cq.isActive
+ True
+ >>> cq.name
+ u'this_query'
+ >>> cq.stop()
+ >>> cq.isActive
+ False
+ >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+ ... queryName='that_query', format='memory')
+ >>> cq.name
+ u'that_query'
+ >>> cq.isActive
+ True
+ >>> cq.stop()
+ """
+ self.options(**options)
+ if partitionBy is not None:
+ self.partitionBy(partitionBy)
+ if format is not None:
+ self.format(format)
+ if queryName is not None:
+ self.queryName(queryName)
+ if path is None:
+ return self._cq(self._jwrite.start())
+ else:
+ return self._cq(self._jwrite.start(path))
+
+
def _test():
import doctest
import os
@@ -929,7 +1285,7 @@ def _test():
globs['spark'] = spark
globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
globs['sdf'] = \
- spark.read.format('text').stream('python/test_support/sql/streaming')
+ spark.readStream.format('text').load('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index f0bf0923b8..11c815dd94 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -31,7 +31,7 @@ from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.sql.catalog import Catalog
from pyspark.sql.conf import RuntimeConfig
from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.readwriter import DataFrameReader
+from pyspark.sql.readwriter import DataFrameReader, DataStreamReader
from pyspark.sql.types import Row, DataType, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
from pyspark.sql.utils import install_exception_handler
@@ -551,11 +551,26 @@ class SparkSession(object):
@property
@since(2.0)
+ def readStream(self):
+ """
+ Returns a :class:`DataStreamReader` that can be used to read data streams
+ as a streaming :class:`DataFrame`.
+
+ .. note:: Experimental.
+
+ :return: :class:`DataStreamReader`
+ """
+ return DataStreamReader(self._wrapped)
+
+ @property
+ @since(2.0)
def streams(self):
"""Returns a :class:`ContinuousQueryManager` that allows managing all the
:class:`ContinuousQuery` ContinuousQueries active on `this` context.
.. note:: Experimental.
+
+ :return: :class:`ContinuousQueryManager`
"""
from pyspark.sql.streaming import ContinuousQueryManager
return ContinuousQueryManager(self._jsparkSession.streams())
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index bb4e62cdd6..0edaa51549 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -119,7 +119,7 @@ class ContinuousQueryManager(object):
def active(self):
"""Returns a list of active queries associated with this SQLContext
- >>> cq = df.write.format('memory').queryName('this_query').startStream()
+ >>> cq = df.writeStream.format('memory').queryName('this_query').start()
>>> cqm = spark.streams
>>> # get the list of active continuous queries
>>> [q.name for q in cqm.active]
@@ -134,7 +134,7 @@ class ContinuousQueryManager(object):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> cq = df.write.format('memory').queryName('this_query').startStream()
+ >>> cq = df.writeStream.format('memory').queryName('this_query').start()
>>> cq.name
u'this_query'
>>> cq = spark.streams.get(cq.id)
@@ -236,7 +236,7 @@ def _test():
globs = pyspark.sql.streaming.__dict__.copy()
try:
- spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+ spark = SparkSession.builder.getOrCreate()
except py4j.protocol.Py4JError:
spark = SparkSession(sc)
@@ -245,7 +245,7 @@ def _test():
globs['spark'] = spark
globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
globs['df'] = \
- globs['spark'].read.format('text').stream('python/test_support/sql/streaming')
+ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index e0acde6783..fee960a1a7 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -892,9 +892,9 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_trigger_takes_keyword_args(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
try:
- df.write.trigger('5 seconds')
+ df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
# should throw error
@@ -902,22 +902,25 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_read_options(self):
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('text').option('path', 'python/test_support/sql/streaming')\
- .schema(schema).stream()
+ df = self.spark.readStream\
+ .format('text')\
+ .option('path', 'python/test_support/sql/streaming')\
+ .schema(schema)\
+ .load()
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('csv').option('path', 'python/test_support/sql/fake') \
- .schema(bad_schema).stream(path='python/test_support/sql/streaming',
- schema=schema, format='text')
+ df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
+ .schema(bad_schema)\
+ .load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_save_options(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -925,8 +928,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').outputMode('append').option('path', out).startStream()
+ cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ .format('parquet').outputMode('append').option('path', out).start()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
@@ -941,7 +944,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -951,9 +954,10 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
+ cq = df.writeStream.option('checkpointLocation', fake1)\
+ .format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
- .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertEqual(cq.name, 'this_query')
@@ -971,7 +975,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -979,8 +983,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try:
@@ -999,7 +1003,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -1007,8 +1011,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try: