aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-28 22:07:11 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-28 22:07:11 -0700
commitf454a7f9f03807dd768319798daa1351bbfc7288 (patch)
treed6a8faadf29cfb5d2496f89187b5316375257fa2 /python
parent153c2f9ac12846367a09684fd875c496d350a603 (diff)
downloadspark-f454a7f9f03807dd768319798daa1351bbfc7288.tar.gz
spark-f454a7f9f03807dd768319798daa1351bbfc7288.tar.bz2
spark-f454a7f9f03807dd768319798daa1351bbfc7288.zip
[SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming
## What changes were proposed in this pull request? - Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming to make them consistent with scala packaging - Exposed the necessary classes in sql.streaming package so that they appear in the docs - Added pyspark.sql.streaming module to the docs ## How was this patch tested? - updated unit tests. - generated docs for testing visibility of pyspark.sql.streaming classes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13955 from tdas/SPARK-16266.
Diffstat (limited to 'python')
-rw-r--r--python/docs/pyspark.sql.rst6
-rw-r--r--python/pyspark/sql/context.py3
-rw-r--r--python/pyspark/sql/dataframe.py3
-rw-r--r--python/pyspark/sql/readwriter.py493
-rw-r--r--python/pyspark/sql/session.py3
-rw-r--r--python/pyspark/sql/streaming.py502
6 files changed, 511 insertions, 499 deletions
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index 6259379ed0..3be9533c12 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -21,3 +21,9 @@ pyspark.sql.functions module
.. automodule:: pyspark.sql.functions
:members:
:undoc-members:
+
+pyspark.sql.streaming module
+----------------------------
+.. automodule:: pyspark.sql.streaming
+ :members:
+ :undoc-members:
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index b5dde13ed7..3503fb90c3 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -26,7 +26,8 @@ from pyspark import since
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.session import _monkey_patch_RDD, SparkSession
from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.readwriter import DataFrameReader, DataStreamReader
+from pyspark.sql.readwriter import DataFrameReader
+from pyspark.sql.streaming import DataStreamReader
from pyspark.sql.types import Row, StringType
from pyspark.sql.utils import install_exception_handler
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 4f13307820..e44b01bba9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -33,7 +33,8 @@ from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
-from pyspark.sql.readwriter import DataFrameWriter, DataStreamWriter
+from pyspark.sql.readwriter import DataFrameWriter
+from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.types import *
__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3f28d7ad50..10f307b987 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
from pyspark.sql import utils
-__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter"]
def to_str(value):
@@ -724,494 +724,6 @@ class DataFrameWriter(OptionUtils):
self._jwrite.mode(mode).jdbc(url, table, jprop)
-class DataStreamReader(OptionUtils):
- """
- Interface used to load a streaming :class:`DataFrame` from external storage systems
- (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
- to access this.
-
- .. note:: Experimental.
-
- .. versionadded:: 2.0
- """
-
- def __init__(self, spark):
- self._jreader = spark._ssql_ctx.readStream()
- self._spark = spark
-
- def _df(self, jdf):
- from pyspark.sql.dataframe import DataFrame
- return DataFrame(jdf, self._spark)
-
- @since(2.0)
- def format(self, source):
- """Specifies the input data source format.
-
- .. note:: Experimental.
-
- :param source: string, name of the data source, e.g. 'json', 'parquet'.
-
- >>> s = spark.readStream.format("text")
- """
- self._jreader = self._jreader.format(source)
- return self
-
- @since(2.0)
- def schema(self, schema):
- """Specifies the input schema.
-
- Some data sources (e.g. JSON) can infer the input schema automatically from data.
- By specifying the schema here, the underlying data source can skip the schema
- inference step, and thus speed up data loading.
-
- .. note:: Experimental.
-
- :param schema: a StructType object
-
- >>> s = spark.readStream.schema(sdf_schema)
- """
- if not isinstance(schema, StructType):
- raise TypeError("schema should be StructType")
- jschema = self._spark._ssql_ctx.parseDataType(schema.json())
- self._jreader = self._jreader.schema(jschema)
- return self
-
- @since(2.0)
- def option(self, key, value):
- """Adds an input option for the underlying data source.
-
- .. note:: Experimental.
-
- >>> s = spark.readStream.option("x", 1)
- """
- self._jreader = self._jreader.option(key, to_str(value))
- return self
-
- @since(2.0)
- def options(self, **options):
- """Adds input options for the underlying data source.
-
- .. note:: Experimental.
-
- >>> s = spark.readStream.options(x="1", y=2)
- """
- for k in options:
- self._jreader = self._jreader.option(k, to_str(options[k]))
- return self
-
- @since(2.0)
- def load(self, path=None, format=None, schema=None, **options):
- """Loads a data stream from a data source and returns it as a :class`DataFrame`.
-
- .. note:: Experimental.
-
- :param path: optional string for file-system backed data sources.
- :param format: optional string for format of the data source. Default to 'parquet'.
- :param schema: optional :class:`StructType` for the input schema.
- :param options: all other string options
-
- >>> json_sdf = spark.readStream.format("json")\
- .schema(sdf_schema)\
- .load(os.path.join(tempfile.mkdtemp(),'data'))
- >>> json_sdf.isStreaming
- True
- >>> json_sdf.schema == sdf_schema
- True
- """
- if format is not None:
- self.format(format)
- if schema is not None:
- self.schema(schema)
- self.options(**options)
- if path is not None:
- if type(path) != str or len(path.strip()) == 0:
- raise ValueError("If the path is provided for stream, it needs to be a " +
- "non-empty string. List of paths are not supported.")
- return self._df(self._jreader.load(path))
- else:
- return self._df(self._jreader.load())
-
- @since(2.0)
- def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
- allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
- allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
- """
- Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
-
- If the ``schema`` parameter is not specified, this function goes
- through the input once to determine the input schema.
-
- .. note:: Experimental.
-
- :param path: string represents path to the JSON dataset,
- or RDD of Strings storing JSON objects.
- :param schema: an optional :class:`StructType` for the input schema.
- :param primitivesAsString: infers all primitive values as a string type. If None is set,
- it uses the default value, ``false``.
- :param prefersDecimal: infers all floating-point values as a decimal type. If the values
- do not fit in decimal, then it infers them as doubles. If None is
- set, it uses the default value, ``false``.
- :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
- it uses the default value, ``false``.
- :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
- it uses the default value, ``false``.
- :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
- set, it uses the default value, ``true``.
- :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
- set, it uses the default value, ``false``.
- :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
- using backslash quoting mechanism. If None is
- set, it uses the default value, ``false``.
- :param mode: allows a mode for dealing with corrupt records during parsing. If None is
- set, it uses the default value, ``PERMISSIVE``.
-
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
- record and puts the malformed string into a new field configured by \
- ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
- ``null`` for extra fields.
- * ``DROPMALFORMED`` : ignores the whole corrupted records.
- * ``FAILFAST`` : throws an exception when it meets corrupted records.
-
- :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
- created by ``PERMISSIVE`` mode. This overrides
- ``spark.sql.columnNameOfCorruptRecord``. If None is set,
- it uses the value specified in
- ``spark.sql.columnNameOfCorruptRecord``.
-
- >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \
- schema = sdf_schema)
- >>> json_sdf.isStreaming
- True
- >>> json_sdf.schema == sdf_schema
- True
- """
- self._set_opts(
- schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
- allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
- allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
- allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
- if isinstance(path, basestring):
- return self._df(self._jreader.json(path))
- else:
- raise TypeError("path can be only a single string")
-
- @since(2.0)
- def parquet(self, path):
- """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
-
- You can set the following Parquet-specific option(s) for reading Parquet files:
- * ``mergeSchema``: sets whether we should merge schemas collected from all \
- Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
- The default value is specified in ``spark.sql.parquet.mergeSchema``.
-
- .. note:: Experimental.
-
- >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
- .parquet(os.path.join(tempfile.mkdtemp()))
- >>> parquet_sdf.isStreaming
- True
- >>> parquet_sdf.schema == sdf_schema
- True
- """
- if isinstance(path, basestring):
- return self._df(self._jreader.parquet(path))
- else:
- raise TypeError("path can be only a single string")
-
- @ignore_unicode_prefix
- @since(2.0)
- def text(self, path):
- """
- Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
- string column named "value", and followed by partitioned columns if there
- are any.
-
- Each line in the text file is a new row in the resulting DataFrame.
-
- .. note:: Experimental.
-
- :param paths: string, or list of strings, for input path(s).
-
- >>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data'))
- >>> text_sdf.isStreaming
- True
- >>> "value" in str(text_sdf.schema)
- True
- """
- if isinstance(path, basestring):
- return self._df(self._jreader.text(path))
- else:
- raise TypeError("path can be only a single string")
-
- @since(2.0)
- def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
- comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
- ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
- """Loads a CSV file stream and returns the result as a :class:`DataFrame`.
-
- This function will go through the input once to determine the input schema if
- ``inferSchema`` is enabled. To avoid going through the entire data once, disable
- ``inferSchema`` option or specify the schema explicitly using ``schema``.
-
- .. note:: Experimental.
-
- :param path: string, or list of strings, for input path(s).
- :param schema: an optional :class:`StructType` for the input schema.
- :param sep: sets the single character as a separator for each field and value.
- If None is set, it uses the default value, ``,``.
- :param encoding: decodes the CSV files by the given encoding type. If None is set,
- it uses the default value, ``UTF-8``.
- :param quote: sets the single character used for escaping quoted values where the
- separator can be part of the value. If None is set, it uses the default
- value, ``"``. If you would like to turn off quotations, you need to set an
- empty string.
- :param escape: sets the single character used for escaping quotes inside an already
- quoted value. If None is set, it uses the default value, ``\``.
- :param comment: sets the single character used for skipping lines beginning with this
- character. By default (None), it is disabled.
- :param header: uses the first line as names of columns. If None is set, it uses the
- default value, ``false``.
- :param inferSchema: infers the input schema automatically from data. It requires one extra
- pass over the data. If None is set, it uses the default value, ``false``.
- :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
- being read should be skipped. If None is set, it uses
- the default value, ``false``.
- :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
- being read should be skipped. If None is set, it uses
- the default value, ``false``.
- :param nullValue: sets the string representation of a null value. If None is set, it uses
- the default value, empty string.
- :param nanValue: sets the string representation of a non-number value. If None is set, it
- uses the default value, ``NaN``.
- :param positiveInf: sets the string representation of a positive infinity value. If None
- is set, it uses the default value, ``Inf``.
- :param negativeInf: sets the string representation of a negative infinity value. If None
- is set, it uses the default value, ``Inf``.
- :param dateFormat: sets the string that indicates a date format. Custom date formats
- follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
- :param maxColumns: defines a hard limit of how many columns a record can have. If None is
- set, it uses the default value, ``20480``.
- :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
- value being read. If None is set, it uses the default value,
- ``1000000``.
- :param mode: allows a mode for dealing with corrupt records during parsing. If None is
- set, it uses the default value, ``PERMISSIVE``.
-
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
- When a schema is set by user, it sets ``null`` for extra fields.
- * ``DROPMALFORMED`` : ignores the whole corrupted records.
- * ``FAILFAST`` : throws an exception when it meets corrupted records.
-
- >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \
- schema = sdf_schema)
- >>> csv_sdf.isStreaming
- True
- >>> csv_sdf.schema == sdf_schema
- True
- """
- self._set_opts(
- schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
- header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
- ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
- nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
- if isinstance(path, basestring):
- return self._df(self._jreader.csv(path))
- else:
- raise TypeError("path can be only a single string")
-
-
-class DataStreamWriter(object):
- """
- Interface used to write a streaming :class:`DataFrame` to external storage systems
- (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream`
- to access this.
-
- .. note:: Experimental.
-
- .. versionadded:: 2.0
- """
-
- def __init__(self, df):
- self._df = df
- self._spark = df.sql_ctx
- self._jwrite = df._jdf.writeStream()
-
- def _sq(self, jsq):
- from pyspark.sql.streaming import StreamingQuery
- return StreamingQuery(jsq)
-
- @since(2.0)
- def outputMode(self, outputMode):
- """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
-
- Options include:
-
- * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
- the sink
- * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
- every time these is some updates
-
- .. note:: Experimental.
-
- >>> writer = sdf.writeStream.outputMode('append')
- """
- if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
- raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
- self._jwrite = self._jwrite.outputMode(outputMode)
- return self
-
- @since(2.0)
- def format(self, source):
- """Specifies the underlying output data source.
-
- .. note:: Experimental.
-
- :param source: string, name of the data source, e.g. 'json', 'parquet'.
-
- >>> writer = sdf.writeStream.format('json')
- """
- self._jwrite = self._jwrite.format(source)
- return self
-
- @since(2.0)
- def option(self, key, value):
- """Adds an output option for the underlying data source.
-
- .. note:: Experimental.
- """
- self._jwrite = self._jwrite.option(key, to_str(value))
- return self
-
- @since(2.0)
- def options(self, **options):
- """Adds output options for the underlying data source.
-
- .. note:: Experimental.
- """
- for k in options:
- self._jwrite = self._jwrite.option(k, to_str(options[k]))
- return self
-
- @since(2.0)
- def partitionBy(self, *cols):
- """Partitions the output by the given columns on the file system.
-
- If specified, the output is laid out on the file system similar
- to Hive's partitioning scheme.
-
- .. note:: Experimental.
-
- :param cols: name of columns
-
- """
- if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
- cols = cols[0]
- self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
- return self
-
- @since(2.0)
- def queryName(self, queryName):
- """Specifies the name of the :class:`StreamingQuery` that can be started with
- :func:`start`. This name must be unique among all the currently active queries
- in the associated SparkSession.
-
- .. note:: Experimental.
-
- :param queryName: unique name for the query
-
- >>> writer = sdf.writeStream.queryName('streaming_query')
- """
- if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
- raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
- self._jwrite = self._jwrite.queryName(queryName)
- return self
-
- @keyword_only
- @since(2.0)
- def trigger(self, processingTime=None):
- """Set the trigger for the stream query. If this is not set it will run the query as fast
- as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
-
- .. note:: Experimental.
-
- :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
-
- >>> # trigger the query for execution every 5 seconds
- >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
- """
- from pyspark.sql.streaming import ProcessingTime
- trigger = None
- if processingTime is not None:
- if type(processingTime) != str or len(processingTime.strip()) == 0:
- raise ValueError('The processing time must be a non empty string. Got: %s' %
- processingTime)
- trigger = ProcessingTime(processingTime)
- if trigger is None:
- raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
- self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
- return self
-
- @ignore_unicode_prefix
- @since(2.0)
- def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
- """Streams the contents of the :class:`DataFrame` to a data source.
-
- The data source is specified by the ``format`` and a set of ``options``.
- If ``format`` is not specified, the default data source configured by
- ``spark.sql.sources.default`` will be used.
-
- .. note:: Experimental.
-
- :param path: the path in a Hadoop supported file system
- :param format: the format used to save
-
- * ``append``: Append contents of this :class:`DataFrame` to existing data.
- * ``overwrite``: Overwrite existing data.
- * ``ignore``: Silently ignore this operation if data already exists.
- * ``error`` (default case): Throw an exception if data already exists.
- :param partitionBy: names of partitioning columns
- :param queryName: unique name for the query
- :param options: All other string options. You may want to provide a `checkpointLocation`
- for most streams, however it is not required for a `memory` stream.
-
- >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
- >>> sq.isActive
- True
- >>> sq.name
- u'this_query'
- >>> sq.stop()
- >>> sq.isActive
- False
- >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
- ... queryName='that_query', format='memory')
- >>> sq.name
- u'that_query'
- >>> sq.isActive
- True
- >>> sq.stop()
- """
- self.options(**options)
- if partitionBy is not None:
- self.partitionBy(partitionBy)
- if format is not None:
- self.format(format)
- if queryName is not None:
- self.queryName(queryName)
- if path is None:
- return self._sq(self._jwrite.start())
- else:
- return self._sq(self._jwrite.start(path))
-
-
def _test():
import doctest
import os
@@ -1235,9 +747,6 @@ def _test():
globs['sc'] = sc
globs['spark'] = spark
globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
- globs['sdf'] = \
- spark.readStream.format('text').load('python/test_support/sql/streaming')
- globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index b4152a34ad..55f86a16f5 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -31,7 +31,8 @@ from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.sql.catalog import Catalog
from pyspark.sql.conf import RuntimeConfig
from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.readwriter import DataFrameReader, DataStreamReader
+from pyspark.sql.readwriter import DataFrameReader
+from pyspark.sql.streaming import DataStreamReader
from pyspark.sql.types import Row, DataType, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
from pyspark.sql.utils import install_exception_handler
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ae45c99e4f..8cf70983a4 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -18,15 +18,18 @@
import sys
if sys.version >= '3':
intlike = int
+ basestring = unicode = str
else:
intlike = (int, long)
from abc import ABCMeta, abstractmethod
-from pyspark import since
+from pyspark import since, keyword_only
from pyspark.rdd import ignore_unicode_prefix
+from pyspark.sql.readwriter import OptionUtils, to_str
+from pyspark.sql.types import *
-__all__ = ["StreamingQuery"]
+__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
class StreamingQuery(object):
@@ -118,7 +121,7 @@ class StreamingQueryManager(object):
def active(self):
"""Returns a list of active queries associated with this SQLContext
- >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sqm = spark.streams
>>> # get the list of active streaming queries
>>> [q.name for q in sqm.active]
@@ -133,7 +136,7 @@ class StreamingQueryManager(object):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> sq = df.writeStream.format('memory').queryName('this_query').start()
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sq.name
u'this_query'
>>> sq = spark.streams.get(sq.id)
@@ -224,6 +227,494 @@ class ProcessingTime(Trigger):
self.interval)
+class DataStreamReader(OptionUtils):
+ """
+ Interface used to load a streaming :class:`DataFrame` from external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, spark):
+ self._jreader = spark._ssql_ctx.readStream()
+ self._spark = spark
+
+ def _df(self, jdf):
+ from pyspark.sql.dataframe import DataFrame
+ return DataFrame(jdf, self._spark)
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the input data source format.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ >>> s = spark.readStream.format("text")
+ """
+ self._jreader = self._jreader.format(source)
+ return self
+
+ @since(2.0)
+ def schema(self, schema):
+ """Specifies the input schema.
+
+ Some data sources (e.g. JSON) can infer the input schema automatically from data.
+ By specifying the schema here, the underlying data source can skip the schema
+ inference step, and thus speed up data loading.
+
+ .. note:: Experimental.
+
+ :param schema: a StructType object
+
+ >>> s = spark.readStream.schema(sdf_schema)
+ """
+ if not isinstance(schema, StructType):
+ raise TypeError("schema should be StructType")
+ jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+ self._jreader = self._jreader.schema(jschema)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an input option for the underlying data source.
+
+ .. note:: Experimental.
+
+ >>> s = spark.readStream.option("x", 1)
+ """
+ self._jreader = self._jreader.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds input options for the underlying data source.
+
+ .. note:: Experimental.
+
+ >>> s = spark.readStream.options(x="1", y=2)
+ """
+ for k in options:
+ self._jreader = self._jreader.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def load(self, path=None, format=None, schema=None, **options):
+ """Loads a data stream from a data source and returns it as a :class`DataFrame`.
+
+ .. note:: Experimental.
+
+ :param path: optional string for file-system backed data sources.
+ :param format: optional string for format of the data source. Default to 'parquet'.
+ :param schema: optional :class:`StructType` for the input schema.
+ :param options: all other string options
+
+ >>> json_sdf = spark.readStream.format("json")\
+ .schema(sdf_schema)\
+ .load(os.path.join(tempfile.mkdtemp(),'data'))
+ >>> json_sdf.isStreaming
+ True
+ >>> json_sdf.schema == sdf_schema
+ True
+ """
+ if format is not None:
+ self.format(format)
+ if schema is not None:
+ self.schema(schema)
+ self.options(**options)
+ if path is not None:
+ if type(path) != str or len(path.strip()) == 0:
+ raise ValueError("If the path is provided for stream, it needs to be a " +
+ "non-empty string. List of paths are not supported.")
+ return self._df(self._jreader.load(path))
+ else:
+ return self._df(self._jreader.load())
+
+ @since(2.0)
+ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
+ allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
+ """
+ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
+
+ If the ``schema`` parameter is not specified, this function goes
+ through the input once to determine the input schema.
+
+ .. note:: Experimental.
+
+ :param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param primitivesAsString: infers all primitive values as a string type. If None is set,
+ it uses the default value, ``false``.
+ :param prefersDecimal: infers all floating-point values as a decimal type. If the values
+ do not fit in decimal, then it infers them as doubles. If None is
+ set, it uses the default value, ``false``.
+ :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
+ it uses the default value, ``false``.
+ :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
+ it uses the default value, ``false``.
+ :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
+ set, it uses the default value, ``true``.
+ :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
+ set, it uses the default value, ``false``.
+ :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
+ using backslash quoting mechanism. If None is
+ set, it uses the default value, ``false``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record and puts the malformed string into a new field configured by \
+ ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
+ ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets corrupted records.
+
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
+ >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \
+ schema = sdf_schema)
+ >>> json_sdf.isStreaming
+ True
+ >>> json_sdf.schema == sdf_schema
+ True
+ """
+ self._set_opts(
+ schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+ allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+ allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+ allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ if isinstance(path, basestring):
+ return self._df(self._jreader.json(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def parquet(self, path):
+ """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
+
+ You can set the following Parquet-specific option(s) for reading Parquet files:
+ * ``mergeSchema``: sets whether we should merge schemas collected from all \
+ Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
+ The default value is specified in ``spark.sql.parquet.mergeSchema``.
+
+ .. note:: Experimental.
+
+ >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
+ .parquet(os.path.join(tempfile.mkdtemp()))
+ >>> parquet_sdf.isStreaming
+ True
+ >>> parquet_sdf.schema == sdf_schema
+ True
+ """
+ if isinstance(path, basestring):
+ return self._df(self._jreader.parquet(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def text(self, path):
+ """
+ Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
+ string column named "value", and followed by partitioned columns if there
+ are any.
+
+ Each line in the text file is a new row in the resulting DataFrame.
+
+ .. note:: Experimental.
+
+ :param paths: string, or list of strings, for input path(s).
+
+ >>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data'))
+ >>> text_sdf.isStreaming
+ True
+ >>> "value" in str(text_sdf.schema)
+ True
+ """
+ if isinstance(path, basestring):
+ return self._df(self._jreader.text(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
+ comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
+ ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+ maxMalformedLogPerPartition=None, mode=None):
+ """Loads a CSV file stream and returns the result as a :class:`DataFrame`.
+
+ This function will go through the input once to determine the input schema if
+ ``inferSchema`` is enabled. To avoid going through the entire data once, disable
+ ``inferSchema`` option or specify the schema explicitly using ``schema``.
+
+ .. note:: Experimental.
+
+ :param path: string, or list of strings, for input path(s).
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param sep: sets the single character as a separator for each field and value.
+ If None is set, it uses the default value, ``,``.
+ :param encoding: decodes the CSV files by the given encoding type. If None is set,
+ it uses the default value, ``UTF-8``.
+ :param quote: sets the single character used for escaping quoted values where the
+ separator can be part of the value. If None is set, it uses the default
+ value, ``"``. If you would like to turn off quotations, you need to set an
+ empty string.
+ :param escape: sets the single character used for escaping quotes inside an already
+ quoted value. If None is set, it uses the default value, ``\``.
+ :param comment: sets the single character used for skipping lines beginning with this
+ character. By default (None), it is disabled.
+ :param header: uses the first line as names of columns. If None is set, it uses the
+ default value, ``false``.
+ :param inferSchema: infers the input schema automatically from data. It requires one extra
+ pass over the data. If None is set, it uses the default value, ``false``.
+ :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param nullValue: sets the string representation of a null value. If None is set, it uses
+ the default value, empty string.
+ :param nanValue: sets the string representation of a non-number value. If None is set, it
+ uses the default value, ``NaN``.
+ :param positiveInf: sets the string representation of a positive infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param negativeInf: sets the string representation of a negative infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to both date type and timestamp type. By default, it is None
+ which means trying to parse times and date by
+ ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ :param maxColumns: defines a hard limit of how many columns a record can have. If None is
+ set, it uses the default value, ``20480``.
+ :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
+ value being read. If None is set, it uses the default value,
+ ``1000000``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
+ When a schema is set by user, it sets ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets corrupted records.
+
+ >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \
+ schema = sdf_schema)
+ >>> csv_sdf.isStreaming
+ True
+ >>> csv_sdf.schema == sdf_schema
+ True
+ """
+ self._set_opts(
+ schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+ header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+ ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+ nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+ dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+ if isinstance(path, basestring):
+ return self._df(self._jreader.csv(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+
+class DataStreamWriter(object):
+ """
+ Interface used to write a streaming :class:`DataFrame` to external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, df):
+ self._df = df
+ self._spark = df.sql_ctx
+ self._jwrite = df._jdf.writeStream()
+
+ def _sq(self, jsq):
+ from pyspark.sql.streaming import StreamingQuery
+ return StreamingQuery(jsq)
+
+ @since(2.0)
+ def outputMode(self, outputMode):
+ """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+
+ Options include:
+
+ * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
+ the sink
+ * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
+ every time these is some updates
+
+ .. note:: Experimental.
+
+ >>> writer = sdf.writeStream.outputMode('append')
+ """
+ if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
+ raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
+ self._jwrite = self._jwrite.outputMode(outputMode)
+ return self
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the underlying output data source.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ >>> writer = sdf.writeStream.format('json')
+ """
+ self._jwrite = self._jwrite.format(source)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an output option for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ self._jwrite = self._jwrite.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds output options for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ for k in options:
+ self._jwrite = self._jwrite.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def partitionBy(self, *cols):
+ """Partitions the output by the given columns on the file system.
+
+ If specified, the output is laid out on the file system similar
+ to Hive's partitioning scheme.
+
+ .. note:: Experimental.
+
+ :param cols: name of columns
+
+ """
+ if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+ cols = cols[0]
+ self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
+ return self
+
+ @since(2.0)
+ def queryName(self, queryName):
+ """Specifies the name of the :class:`StreamingQuery` that can be started with
+ :func:`start`. This name must be unique among all the currently active queries
+ in the associated SparkSession.
+
+ .. note:: Experimental.
+
+ :param queryName: unique name for the query
+
+ >>> writer = sdf.writeStream.queryName('streaming_query')
+ """
+ if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
+ raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
+ self._jwrite = self._jwrite.queryName(queryName)
+ return self
+
+ @keyword_only
+ @since(2.0)
+ def trigger(self, processingTime=None):
+ """Set the trigger for the stream query. If this is not set it will run the query as fast
+ as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
+
+ .. note:: Experimental.
+
+ :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
+
+ >>> # trigger the query for execution every 5 seconds
+ >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
+ """
+ from pyspark.sql.streaming import ProcessingTime
+ trigger = None
+ if processingTime is not None:
+ if type(processingTime) != str or len(processingTime.strip()) == 0:
+ raise ValueError('The processing time must be a non empty string. Got: %s' %
+ processingTime)
+ trigger = ProcessingTime(processingTime)
+ if trigger is None:
+ raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
+ self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
+ return self
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+ """Streams the contents of the :class:`DataFrame` to a data source.
+
+ The data source is specified by the ``format`` and a set of ``options``.
+ If ``format`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ .. note:: Experimental.
+
+ :param path: the path in a Hadoop supported file system
+ :param format: the format used to save
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
+ :param partitionBy: names of partitioning columns
+ :param queryName: unique name for the query
+ :param options: All other string options. You may want to provide a `checkpointLocation`
+ for most streams, however it is not required for a `memory` stream.
+
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.isActive
+ True
+ >>> sq.name
+ u'this_query'
+ >>> sq.stop()
+ >>> sq.isActive
+ False
+ >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+ ... queryName='that_query', format='memory')
+ >>> sq.name
+ u'that_query'
+ >>> sq.isActive
+ True
+ >>> sq.stop()
+ """
+ self.options(**options)
+ if partitionBy is not None:
+ self.partitionBy(partitionBy)
+ if format is not None:
+ self.format(format)
+ if queryName is not None:
+ self.queryName(queryName)
+ if path is None:
+ return self._sq(self._jwrite.start())
+ else:
+ return self._sq(self._jwrite.start(path))
+
+
def _test():
import doctest
import os
@@ -243,6 +734,9 @@ def _test():
globs['os'] = os
globs['spark'] = spark
globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
+ globs['sdf'] = \
+ spark.readStream.format('text').load('python/test_support/sql/streaming')
+ globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')