aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/context.py13
-rw-r--r--python/pyspark/sql/dataframe.py18
-rw-r--r--python/pyspark/sql/readwriter.py640
-rw-r--r--python/pyspark/sql/session.py17
-rw-r--r--python/pyspark/sql/streaming.py8
-rw-r--r--python/pyspark/sql/tests.py42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala255
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala294
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala386
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala)349
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala231
24 files changed, 1679 insertions, 758 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 60f62b219b..a271afe4cf 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -430,6 +430,19 @@ class SQLContext(object):
@property
@since(2.0)
+ def readStream(self):
+ """
+ Returns a :class:`DataStreamReader` that can be used to read data streams
+ as a streaming :class:`DataFrame`.
+
+ .. note:: Experimental.
+
+ :return: :class:`DataStreamReader`
+ """
+ return DataStreamReader(self._wrapped)
+
+ @property
+ @since(2.0)
def streams(self):
"""Returns a :class:`ContinuousQueryManager` that allows managing all the
:class:`ContinuousQuery` ContinuousQueries active on `this` context.
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 4fa799ac55..0126faf574 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -33,7 +33,7 @@ from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
-from pyspark.sql.readwriter import DataFrameWriter
+from pyspark.sql.readwriter import DataFrameWriter, DataStreamWriter
from pyspark.sql.types import *
__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
@@ -172,13 +172,27 @@ class DataFrame(object):
@since(1.4)
def write(self):
"""
- Interface for saving the content of the :class:`DataFrame` out into external storage.
+ Interface for saving the content of the non-streaming :class:`DataFrame` out into external
+ storage.
:return: :class:`DataFrameWriter`
"""
return DataFrameWriter(self)
@property
+ @since(2.0)
+ def writeStream(self):
+ """
+ Interface for saving the content of the streaming :class:`DataFrame` out into external
+ storage.
+
+ .. note:: Experimental.
+
+ :return: :class:`DataStreamWriter`
+ """
+ return DataStreamWriter(self)
+
+ @property
@since(1.3)
def schema(self):
"""Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`.
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 0f50f672a2..ad954d0ad8 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -137,34 +137,6 @@ class DataFrameReader(object):
else:
return self._df(self._jreader.load())
- @since(2.0)
- def stream(self, path=None, format=None, schema=None, **options):
- """Loads a data stream from a data source and returns it as a :class`DataFrame`.
-
- .. note:: Experimental.
-
- :param path: optional string for file-system backed data sources.
- :param format: optional string for format of the data source. Default to 'parquet'.
- :param schema: optional :class:`StructType` for the input schema.
- :param options: all other string options
-
- >>> df = spark.read.format('text').stream('python/test_support/sql/streaming')
- >>> df.isStreaming
- True
- """
- if format is not None:
- self.format(format)
- if schema is not None:
- self.schema(schema)
- self.options(**options)
- if path is not None:
- if type(path) != str or len(path.strip()) == 0:
- raise ValueError("If the path is provided for stream, it needs to be a " +
- "non-empty string. List of paths are not supported.")
- return self._df(self._jreader.stream(path))
- else:
- return self._df(self._jreader.stream())
-
@since(1.4)
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
@@ -509,26 +481,6 @@ class DataFrameWriter(object):
self._jwrite = self._jwrite.mode(saveMode)
return self
- @since(2.0)
- def outputMode(self, outputMode):
- """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
-
- Options include:
-
- * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
- the sink
- * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
- every time these is some updates
-
- .. note:: Experimental.
-
- >>> writer = sdf.write.outputMode('append')
- """
- if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
- raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
- self._jwrite = self._jwrite.outputMode(outputMode)
- return self
-
@since(1.4)
def format(self, source):
"""Specifies the underlying output data source.
@@ -571,48 +523,6 @@ class DataFrameWriter(object):
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
return self
- @since(2.0)
- def queryName(self, queryName):
- """Specifies the name of the :class:`ContinuousQuery` that can be started with
- :func:`startStream`. This name must be unique among all the currently active queries
- in the associated SparkSession.
-
- .. note:: Experimental.
-
- :param queryName: unique name for the query
-
- >>> writer = sdf.write.queryName('streaming_query')
- """
- if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
- raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
- self._jwrite = self._jwrite.queryName(queryName)
- return self
-
- @keyword_only
- @since(2.0)
- def trigger(self, processingTime=None):
- """Set the trigger for the stream query. If this is not set it will run the query as fast
- as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
-
- .. note:: Experimental.
-
- :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
-
- >>> # trigger the query for execution every 5 seconds
- >>> writer = sdf.write.trigger(processingTime='5 seconds')
- """
- from pyspark.sql.streaming import ProcessingTime
- trigger = None
- if processingTime is not None:
- if type(processingTime) != str or len(processingTime.strip()) == 0:
- raise ValueError('The processing time must be a non empty string. Got: %s' %
- processingTime)
- trigger = ProcessingTime(processingTime)
- if trigger is None:
- raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
- self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
- return self
-
@since(1.4)
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
@@ -644,57 +554,6 @@ class DataFrameWriter(object):
else:
self._jwrite.save(path)
- @ignore_unicode_prefix
- @since(2.0)
- def startStream(self, path=None, format=None, partitionBy=None, queryName=None, **options):
- """Streams the contents of the :class:`DataFrame` to a data source.
-
- The data source is specified by the ``format`` and a set of ``options``.
- If ``format`` is not specified, the default data source configured by
- ``spark.sql.sources.default`` will be used.
-
- .. note:: Experimental.
-
- :param path: the path in a Hadoop supported file system
- :param format: the format used to save
-
- * ``append``: Append contents of this :class:`DataFrame` to existing data.
- * ``overwrite``: Overwrite existing data.
- * ``ignore``: Silently ignore this operation if data already exists.
- * ``error`` (default case): Throw an exception if data already exists.
- :param partitionBy: names of partitioning columns
- :param queryName: unique name for the query
- :param options: All other string options. You may want to provide a `checkpointLocation`
- for most streams, however it is not required for a `memory` stream.
-
- >>> cq = sdf.write.format('memory').queryName('this_query').startStream()
- >>> cq.isActive
- True
- >>> cq.name
- u'this_query'
- >>> cq.stop()
- >>> cq.isActive
- False
- >>> cq = sdf.write.trigger(processingTime='5 seconds').startStream(
- ... queryName='that_query', format='memory')
- >>> cq.name
- u'that_query'
- >>> cq.isActive
- True
- >>> cq.stop()
- """
- self.options(**options)
- if partitionBy is not None:
- self.partitionBy(partitionBy)
- if format is not None:
- self.format(format)
- if queryName is not None:
- self.queryName(queryName)
- if path is None:
- return self._cq(self._jwrite.startStream())
- else:
- return self._cq(self._jwrite.startStream(path))
-
@since(1.4)
def insertInto(self, tableName, overwrite=False):
"""Inserts the content of the :class:`DataFrame` to the specified table.
@@ -905,6 +764,503 @@ class DataFrameWriter(object):
self._jwrite.mode(mode).jdbc(url, table, jprop)
+class DataStreamReader(object):
+ """
+ Interface used to load a streaming :class:`DataFrame` from external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, spark):
+ self._jreader = spark._ssql_ctx.readStream()
+ self._spark = spark
+
+ def _df(self, jdf):
+ from pyspark.sql.dataframe import DataFrame
+ return DataFrame(jdf, self._spark)
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the input data source format.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ """
+ self._jreader = self._jreader.format(source)
+ return self
+
+ @since(2.0)
+ def schema(self, schema):
+ """Specifies the input schema.
+
+ Some data sources (e.g. JSON) can infer the input schema automatically from data.
+ By specifying the schema here, the underlying data source can skip the schema
+ inference step, and thus speed up data loading.
+
+ .. note:: Experimental.
+
+ :param schema: a StructType object
+ """
+ if not isinstance(schema, StructType):
+ raise TypeError("schema should be StructType")
+ jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+ self._jreader = self._jreader.schema(jschema)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an input option for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ self._jreader = self._jreader.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds input options for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ for k in options:
+ self._jreader = self._jreader.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def load(self, path=None, format=None, schema=None, **options):
+ """Loads a data stream from a data source and returns it as a :class`DataFrame`.
+
+ .. note:: Experimental.
+
+ :param path: optional string for file-system backed data sources.
+ :param format: optional string for format of the data source. Default to 'parquet'.
+ :param schema: optional :class:`StructType` for the input schema.
+ :param options: all other string options
+
+ """
+ if format is not None:
+ self.format(format)
+ if schema is not None:
+ self.schema(schema)
+ self.options(**options)
+ if path is not None:
+ if type(path) != str or len(path.strip()) == 0:
+ raise ValueError("If the path is provided for stream, it needs to be a " +
+ "non-empty string. List of paths are not supported.")
+ return self._df(self._jreader.load(path))
+ else:
+ return self._df(self._jreader.load())
+
+ @since(2.0)
+ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
+ allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
+ """
+ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
+
+ If the ``schema`` parameter is not specified, this function goes
+ through the input once to determine the input schema.
+
+ .. note:: Experimental.
+
+ :param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param primitivesAsString: infers all primitive values as a string type. If None is set,
+ it uses the default value, ``false``.
+ :param prefersDecimal: infers all floating-point values as a decimal type. If the values
+ do not fit in decimal, then it infers them as doubles. If None is
+ set, it uses the default value, ``false``.
+ :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
+ it uses the default value, ``false``.
+ :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
+ it uses the default value, ``false``.
+ :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
+ set, it uses the default value, ``true``.
+ :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
+ set, it uses the default value, ``false``.
+ :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
+ using backslash quoting mechanism. If None is
+ set, it uses the default value, ``false``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record and puts the malformed string into a new field configured by \
+ ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
+ ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets corrupted records.
+
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
+ """
+ if schema is not None:
+ self.schema(schema)
+ if primitivesAsString is not None:
+ self.option("primitivesAsString", primitivesAsString)
+ if prefersDecimal is not None:
+ self.option("prefersDecimal", prefersDecimal)
+ if allowComments is not None:
+ self.option("allowComments", allowComments)
+ if allowUnquotedFieldNames is not None:
+ self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+ if allowSingleQuotes is not None:
+ self.option("allowSingleQuotes", allowSingleQuotes)
+ if allowNumericLeadingZero is not None:
+ self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+ if allowBackslashEscapingAnyCharacter is not None:
+ self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter)
+ if mode is not None:
+ self.option("mode", mode)
+ if columnNameOfCorruptRecord is not None:
+ self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.json(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def parquet(self, path):
+ """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
+
+ You can set the following Parquet-specific option(s) for reading Parquet files:
+ * ``mergeSchema``: sets whether we should merge schemas collected from all \
+ Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
+ The default value is specified in ``spark.sql.parquet.mergeSchema``.
+
+ .. note:: Experimental.
+
+ """
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.parquet(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def text(self, path):
+ """
+ Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
+ string column named "value", and followed by partitioned columns if there
+ are any.
+
+ Each line in the text file is a new row in the resulting DataFrame.
+
+ .. note:: Experimental.
+
+ :param paths: string, or list of strings, for input path(s).
+
+ """
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
+ comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
+ ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+ """Loads a CSV file stream and returns the result as a :class:`DataFrame`.
+
+ This function will go through the input once to determine the input schema if
+ ``inferSchema`` is enabled. To avoid going through the entire data once, disable
+ ``inferSchema`` option or specify the schema explicitly using ``schema``.
+
+ .. note:: Experimental.
+
+ :param path: string, or list of strings, for input path(s).
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param sep: sets the single character as a separator for each field and value.
+ If None is set, it uses the default value, ``,``.
+ :param encoding: decodes the CSV files by the given encoding type. If None is set,
+ it uses the default value, ``UTF-8``.
+ :param quote: sets the single character used for escaping quoted values where the
+ separator can be part of the value. If None is set, it uses the default
+ value, ``"``. If you would like to turn off quotations, you need to set an
+ empty string.
+ :param escape: sets the single character used for escaping quotes inside an already
+ quoted value. If None is set, it uses the default value, ``\``.
+ :param comment: sets the single character used for skipping lines beginning with this
+ character. By default (None), it is disabled.
+ :param header: uses the first line as names of columns. If None is set, it uses the
+ default value, ``false``.
+ :param inferSchema: infers the input schema automatically from data. It requires one extra
+ pass over the data. If None is set, it uses the default value, ``false``.
+ :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param nullValue: sets the string representation of a null value. If None is set, it uses
+ the default value, empty string.
+ :param nanValue: sets the string representation of a non-number value. If None is set, it
+ uses the default value, ``NaN``.
+ :param positiveInf: sets the string representation of a positive infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param negativeInf: sets the string representation of a negative infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to both date type and timestamp type. By default, it is None
+ which means trying to parse times and date by
+ ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ :param maxColumns: defines a hard limit of how many columns a record can have. If None is
+ set, it uses the default value, ``20480``.
+ :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
+ value being read. If None is set, it uses the default value,
+ ``1000000``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
+ When a schema is set by user, it sets ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets corrupted records.
+
+ """
+ if schema is not None:
+ self.schema(schema)
+ if sep is not None:
+ self.option("sep", sep)
+ if encoding is not None:
+ self.option("encoding", encoding)
+ if quote is not None:
+ self.option("quote", quote)
+ if escape is not None:
+ self.option("escape", escape)
+ if comment is not None:
+ self.option("comment", comment)
+ if header is not None:
+ self.option("header", header)
+ if inferSchema is not None:
+ self.option("inferSchema", inferSchema)
+ if ignoreLeadingWhiteSpace is not None:
+ self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
+ if ignoreTrailingWhiteSpace is not None:
+ self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
+ if nullValue is not None:
+ self.option("nullValue", nullValue)
+ if nanValue is not None:
+ self.option("nanValue", nanValue)
+ if positiveInf is not None:
+ self.option("positiveInf", positiveInf)
+ if negativeInf is not None:
+ self.option("negativeInf", negativeInf)
+ if dateFormat is not None:
+ self.option("dateFormat", dateFormat)
+ if maxColumns is not None:
+ self.option("maxColumns", maxColumns)
+ if maxCharsPerColumn is not None:
+ self.option("maxCharsPerColumn", maxCharsPerColumn)
+ if mode is not None:
+ self.option("mode", mode)
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+
+class DataStreamWriter(object):
+ """
+ Interface used to write a streaming :class:`DataFrame` to external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, df):
+ self._df = df
+ self._spark = df.sql_ctx
+ self._jwrite = df._jdf.writeStream()
+
+ def _cq(self, jcq):
+ from pyspark.sql.streaming import ContinuousQuery
+ return ContinuousQuery(jcq)
+
+ @since(2.0)
+ def outputMode(self, outputMode):
+ """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+
+ Options include:
+
+ * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
+ the sink
+ * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
+ every time these is some updates
+
+ .. note:: Experimental.
+
+ >>> writer = sdf.writeStream.outputMode('append')
+ """
+ if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
+ raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
+ self._jwrite = self._jwrite.outputMode(outputMode)
+ return self
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the underlying output data source.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ >>> writer = sdf.writeStream.format('json')
+ """
+ self._jwrite = self._jwrite.format(source)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an output option for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ self._jwrite = self._jwrite.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds output options for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ for k in options:
+ self._jwrite = self._jwrite.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def partitionBy(self, *cols):
+ """Partitions the output by the given columns on the file system.
+
+ If specified, the output is laid out on the file system similar
+ to Hive's partitioning scheme.
+
+ .. note:: Experimental.
+
+ :param cols: name of columns
+
+ """
+ if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+ cols = cols[0]
+ self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
+ return self
+
+ @since(2.0)
+ def queryName(self, queryName):
+ """Specifies the name of the :class:`ContinuousQuery` that can be started with
+ :func:`startStream`. This name must be unique among all the currently active queries
+ in the associated SparkSession.
+
+ .. note:: Experimental.
+
+ :param queryName: unique name for the query
+
+ >>> writer = sdf.writeStream.queryName('streaming_query')
+ """
+ if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
+ raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
+ self._jwrite = self._jwrite.queryName(queryName)
+ return self
+
+ @keyword_only
+ @since(2.0)
+ def trigger(self, processingTime=None):
+ """Set the trigger for the stream query. If this is not set it will run the query as fast
+ as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
+
+ .. note:: Experimental.
+
+ :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
+
+ >>> # trigger the query for execution every 5 seconds
+ >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
+ """
+ from pyspark.sql.streaming import ProcessingTime
+ trigger = None
+ if processingTime is not None:
+ if type(processingTime) != str or len(processingTime.strip()) == 0:
+ raise ValueError('The processing time must be a non empty string. Got: %s' %
+ processingTime)
+ trigger = ProcessingTime(processingTime)
+ if trigger is None:
+ raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
+ self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
+ return self
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+ """Streams the contents of the :class:`DataFrame` to a data source.
+
+ The data source is specified by the ``format`` and a set of ``options``.
+ If ``format`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ .. note:: Experimental.
+
+ :param path: the path in a Hadoop supported file system
+ :param format: the format used to save
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
+ :param partitionBy: names of partitioning columns
+ :param queryName: unique name for the query
+ :param options: All other string options. You may want to provide a `checkpointLocation`
+ for most streams, however it is not required for a `memory` stream.
+
+ >>> cq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> cq.isActive
+ True
+ >>> cq.name
+ u'this_query'
+ >>> cq.stop()
+ >>> cq.isActive
+ False
+ >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+ ... queryName='that_query', format='memory')
+ >>> cq.name
+ u'that_query'
+ >>> cq.isActive
+ True
+ >>> cq.stop()
+ """
+ self.options(**options)
+ if partitionBy is not None:
+ self.partitionBy(partitionBy)
+ if format is not None:
+ self.format(format)
+ if queryName is not None:
+ self.queryName(queryName)
+ if path is None:
+ return self._cq(self._jwrite.start())
+ else:
+ return self._cq(self._jwrite.start(path))
+
+
def _test():
import doctest
import os
@@ -929,7 +1285,7 @@ def _test():
globs['spark'] = spark
globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
globs['sdf'] = \
- spark.read.format('text').stream('python/test_support/sql/streaming')
+ spark.readStream.format('text').load('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index f0bf0923b8..11c815dd94 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -31,7 +31,7 @@ from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.sql.catalog import Catalog
from pyspark.sql.conf import RuntimeConfig
from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.readwriter import DataFrameReader
+from pyspark.sql.readwriter import DataFrameReader, DataStreamReader
from pyspark.sql.types import Row, DataType, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
from pyspark.sql.utils import install_exception_handler
@@ -551,11 +551,26 @@ class SparkSession(object):
@property
@since(2.0)
+ def readStream(self):
+ """
+ Returns a :class:`DataStreamReader` that can be used to read data streams
+ as a streaming :class:`DataFrame`.
+
+ .. note:: Experimental.
+
+ :return: :class:`DataStreamReader`
+ """
+ return DataStreamReader(self._wrapped)
+
+ @property
+ @since(2.0)
def streams(self):
"""Returns a :class:`ContinuousQueryManager` that allows managing all the
:class:`ContinuousQuery` ContinuousQueries active on `this` context.
.. note:: Experimental.
+
+ :return: :class:`ContinuousQueryManager`
"""
from pyspark.sql.streaming import ContinuousQueryManager
return ContinuousQueryManager(self._jsparkSession.streams())
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index bb4e62cdd6..0edaa51549 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -119,7 +119,7 @@ class ContinuousQueryManager(object):
def active(self):
"""Returns a list of active queries associated with this SQLContext
- >>> cq = df.write.format('memory').queryName('this_query').startStream()
+ >>> cq = df.writeStream.format('memory').queryName('this_query').start()
>>> cqm = spark.streams
>>> # get the list of active continuous queries
>>> [q.name for q in cqm.active]
@@ -134,7 +134,7 @@ class ContinuousQueryManager(object):
"""Returns an active query from this SQLContext or throws exception if an active query
with this name doesn't exist.
- >>> cq = df.write.format('memory').queryName('this_query').startStream()
+ >>> cq = df.writeStream.format('memory').queryName('this_query').start()
>>> cq.name
u'this_query'
>>> cq = spark.streams.get(cq.id)
@@ -236,7 +236,7 @@ def _test():
globs = pyspark.sql.streaming.__dict__.copy()
try:
- spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+ spark = SparkSession.builder.getOrCreate()
except py4j.protocol.Py4JError:
spark = SparkSession(sc)
@@ -245,7 +245,7 @@ def _test():
globs['spark'] = spark
globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
globs['df'] = \
- globs['spark'].read.format('text').stream('python/test_support/sql/streaming')
+ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index e0acde6783..fee960a1a7 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -892,9 +892,9 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_trigger_takes_keyword_args(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
try:
- df.write.trigger('5 seconds')
+ df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
# should throw error
@@ -902,22 +902,25 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_read_options(self):
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('text').option('path', 'python/test_support/sql/streaming')\
- .schema(schema).stream()
+ df = self.spark.readStream\
+ .format('text')\
+ .option('path', 'python/test_support/sql/streaming')\
+ .schema(schema)\
+ .load()
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('csv').option('path', 'python/test_support/sql/fake') \
- .schema(bad_schema).stream(path='python/test_support/sql/streaming',
- schema=schema, format='text')
+ df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
+ .schema(bad_schema)\
+ .load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_save_options(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -925,8 +928,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').outputMode('append').option('path', out).startStream()
+ cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ .format('parquet').outputMode('append').option('path', out).start()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
@@ -941,7 +944,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -951,9 +954,10 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
+ cq = df.writeStream.option('checkpointLocation', fake1)\
+ .format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
- .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertEqual(cq.name, 'this_query')
@@ -971,7 +975,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -979,8 +983,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try:
@@ -999,7 +1003,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -1007,8 +1011,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index dfe31da3f3..2ae854d04f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -22,7 +22,6 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.spark.Partition
-import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -30,12 +29,11 @@ import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
-import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
/**
* Interface used to load a [[Dataset]] from external storage systems (e.g. file systems,
- * key-value stores, etc) or data streams. Use [[SparkSession.read]] to access this.
+ * key-value stores, etc). Use [[SparkSession.read]] to access this.
*
* @since 1.4.0
*/
@@ -160,36 +158,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
options = extraOptions.toMap).resolveRelation())
}
}
-
- /**
- * :: Experimental ::
- * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
- * (e.g. external key-value stores).
- *
- * @since 2.0.0
- */
- @Experimental
- def stream(): DataFrame = {
- val dataSource =
- DataSource(
- sparkSession,
- userSpecifiedSchema = userSpecifiedSchema,
- className = source,
- options = extraOptions.toMap)
- Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
- }
-
- /**
- * :: Experimental ::
- * Loads input in as a [[DataFrame]], for data streams that read from some path.
- *
- * @since 2.0.0
- */
- @Experimental
- def stream(path: String): DataFrame = {
- option("path", path).stream()
- }
-
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 171b1378e5..60a9d1f020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,20 +23,15 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
-import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger}
-import org.apache.spark.util.Utils
/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
- * key-value stores, etc) or data streams. Use [[Dataset.write]] to access this.
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
*
* @since 1.4.0
*/
@@ -54,9 +49,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter[T] = {
- // mode() is used for non-continuous queries
- // outputMode() is used for continuous queries
- assertNotStreaming("mode() can only be called on non-continuous queries")
this.mode = saveMode
this
}
@@ -71,9 +63,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter[T] = {
- // mode() is used for non-continuous queries
- // outputMode() is used for continuous queries
- assertNotStreaming("mode() can only be called on non-continuous queries")
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
@@ -86,76 +75,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
/**
- * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
- * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
- * written to the sink
- * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
- * to the sink every time these is some updates
- *
- * @since 2.0.0
- */
- @Experimental
- def outputMode(outputMode: OutputMode): DataFrameWriter[T] = {
- assertStreaming("outputMode() can only be called on continuous queries")
- this.outputMode = outputMode
- this
- }
-
- /**
- * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
- * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
- * the sink
- * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
- * every time these is some updates
- *
- * @since 2.0.0
- */
- @Experimental
- def outputMode(outputMode: String): DataFrameWriter[T] = {
- assertStreaming("outputMode() can only be called on continuous queries")
- this.outputMode = outputMode.toLowerCase match {
- case "append" =>
- OutputMode.Append
- case "complete" =>
- OutputMode.Complete
- case _ =>
- throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
- "Accepted output modes are 'append' and 'complete'")
- }
- this
- }
-
- /**
- * :: Experimental ::
- * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
- * the query as fast as possible.
- *
- * Scala Example:
- * {{{
- * df.write.trigger(ProcessingTime("10 seconds"))
- *
- * import scala.concurrent.duration._
- * df.write.trigger(ProcessingTime(10.seconds))
- * }}}
- *
- * Java Example:
- * {{{
- * df.write.trigger(ProcessingTime.create("10 seconds"))
- *
- * import java.util.concurrent.TimeUnit
- * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
- * }}}
- *
- * @since 2.0.0
- */
- @Experimental
- def trigger(trigger: Trigger): DataFrameWriter[T] = {
- assertStreaming("trigger() can only be called on continuous queries")
- this.trigger = trigger
- this
- }
-
- /**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
* @since 1.4.0
@@ -284,7 +203,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
def save(): Unit = {
assertNotBucketed("save")
- assertNotStreaming("save() can only be called on non-continuous queries")
val dataSource = DataSource(
df.sparkSession,
className = source,
@@ -294,148 +212,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
dataSource.write(mode, df)
}
-
- /**
- * :: Experimental ::
- * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`.
- * This name must be unique among all the currently active queries in the associated SQLContext.
- *
- * @since 2.0.0
- */
- @Experimental
- def queryName(queryName: String): DataFrameWriter[T] = {
- assertStreaming("queryName() can only be called on continuous queries")
- this.extraOptions += ("queryName" -> queryName)
- this
- }
-
- /**
- * :: Experimental ::
- * Starts the execution of the streaming query, which will continually output results to the given
- * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
- * the stream.
- *
- * @since 2.0.0
- */
- @Experimental
- def startStream(path: String): ContinuousQuery = {
- option("path", path).startStream()
- }
-
- /**
- * :: Experimental ::
- * Starts the execution of the streaming query, which will continually output results to the given
- * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
- * the stream.
- *
- * @since 2.0.0
- */
- @Experimental
- def startStream(): ContinuousQuery = {
- assertNotBucketed("startStream")
- assertStreaming("startStream() can only be called on continuous queries")
-
- if (source == "memory") {
- if (extraOptions.get("queryName").isEmpty) {
- throw new AnalysisException("queryName must be specified for memory sink")
- }
-
- val sink = new MemorySink(df.schema, outputMode)
- val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
- val query = df.sparkSession.sessionState.continuousQueryManager.startQuery(
- extraOptions.get("queryName"),
- extraOptions.get("checkpointLocation"),
- df,
- sink,
- outputMode,
- useTempCheckpointLocation = true,
- recoverFromCheckpointLocation = false,
- trigger = trigger)
- resultDf.createOrReplaceTempView(query.name)
- query
- } else {
- val dataSource =
- DataSource(
- df.sparkSession,
- className = source,
- options = extraOptions.toMap,
- partitionColumns = normalizedParCols.getOrElse(Nil))
- df.sparkSession.sessionState.continuousQueryManager.startQuery(
- extraOptions.get("queryName"),
- extraOptions.get("checkpointLocation"),
- df,
- dataSource.createSink(outputMode),
- outputMode,
- trigger = trigger)
- }
- }
-
- /**
- * :: Experimental ::
- * Starts the execution of the streaming query, which will continually send results to the given
- * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
- * generated by the [[DataFrame]]/[[Dataset]] to an external system. The returned The returned
- * [[ContinuousQuery]] object can be used to interact with the stream.
- *
- * Scala example:
- * {{{
- * datasetOfString.write.foreach(new ForeachWriter[String] {
- *
- * def open(partitionId: Long, version: Long): Boolean = {
- * // open connection
- * }
- *
- * def process(record: String) = {
- * // write string to connection
- * }
- *
- * def close(errorOrNull: Throwable): Unit = {
- * // close the connection
- * }
- * })
- * }}}
- *
- * Java example:
- * {{{
- * datasetOfString.write().foreach(new ForeachWriter<String>() {
- *
- * @Override
- * public boolean open(long partitionId, long version) {
- * // open connection
- * }
- *
- * @Override
- * public void process(String value) {
- * // write string to connection
- * }
- *
- * @Override
- * public void close(Throwable errorOrNull) {
- * // close the connection
- * }
- * });
- * }}}
- *
- * @since 2.0.0
- */
- @Experimental
- def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
- assertNotPartitioned("foreach")
- assertNotBucketed("foreach")
- assertStreaming(
- "foreach() can only be called on streaming Datasets/DataFrames.")
-
- val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
- df.sparkSession.sessionState.continuousQueryManager.startQuery(
- extraOptions.get("queryName"),
- extraOptions.get("checkpointLocation"),
- df,
- sink,
- outputMode,
- useTempCheckpointLocation = true,
- trigger = trigger)
- }
-
/**
* Inserts the content of the [[DataFrame]] to the specified table. It requires that
* the schema of the [[DataFrame]] is the same as the schema of the table.
@@ -467,7 +243,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private def insertInto(tableIdent: TableIdentifier): Unit = {
assertNotBucketed("insertInto")
- assertNotStreaming("insertInto() can only be called on non-continuous queries")
val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
val overwrite = mode == SaveMode.Overwrite
@@ -586,7 +361,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- assertNotStreaming("saveAsTable() can only be called on non-continuous queries")
val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
@@ -629,7 +403,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")
- assertNotStreaming("jdbc() can only be called on non-continuous queries")
val props = new Properties()
extraOptions.foreach { case (key, value) =>
@@ -688,7 +461,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def json(path: String): Unit = {
- assertNotStreaming("json() can only be called on non-continuous queries")
format("json").save(path)
}
@@ -708,7 +480,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def parquet(path: String): Unit = {
- assertNotStreaming("parquet() can only be called on non-continuous queries")
format("parquet").save(path)
}
@@ -728,7 +499,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @note Currently, this method can only be used after enabling Hive support
*/
def orc(path: String): Unit = {
- assertNotStreaming("orc() can only be called on non-continuous queries")
format("orc").save(path)
}
@@ -752,7 +522,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.6.0
*/
def text(path: String): Unit = {
- assertNotStreaming("text() can only be called on non-continuous queries")
format("text").save(path)
}
@@ -782,7 +551,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def csv(path: String): Unit = {
- assertNotStreaming("csv() can only be called on non-continuous queries")
format("csv").save(path)
}
@@ -794,10 +562,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private var mode: SaveMode = SaveMode.ErrorIfExists
- private var outputMode: OutputMode = OutputMode.Append
-
- private var trigger: Trigger = ProcessingTime(0L)
-
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
private var partitioningColumns: Option[Seq[String]] = None
@@ -807,21 +571,4 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private var numBuckets: Option[Int] = None
private var sortColumnNames: Option[Seq[String]] = None
-
- ///////////////////////////////////////////////////////////////////////////////////////
- // Helper functions
- ///////////////////////////////////////////////////////////////////////////////////////
-
- private def assertNotStreaming(errMsg: String): Unit = {
- if (df.isStreaming) {
- throw new AnalysisException(errMsg)
- }
- }
-
- private def assertStreaming(errMsg: String): Unit = {
- if (!df.isStreaming) {
- throw new AnalysisException(errMsg)
- }
- }
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 53779df3d9..f9db325ea2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
-import org.apache.spark.sql.streaming.ContinuousQuery
+import org.apache.spark.sql.streaming.{ContinuousQuery, DataStreamWriter}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -2407,13 +2407,36 @@ class Dataset[T] private[sql](
/**
* :: Experimental ::
- * Interface for saving the content of the Dataset out into external storage or streams.
+ * Interface for saving the content of the non-streaming Dataset out into external storage.
*
* @group basic
* @since 1.6.0
*/
@Experimental
- def write: DataFrameWriter[T] = new DataFrameWriter[T](this)
+ def write: DataFrameWriter[T] = {
+ if (isStreaming) {
+ logicalPlan.failAnalysis(
+ "'write' can not be called on streaming Dataset/DataFrame")
+ }
+ new DataFrameWriter[T](this)
+ }
+
+ /**
+ * :: Experimental ::
+ * Interface for saving the content of the streaming Dataset out into external storage.
+ *
+ * @group basic
+ * @since 2.0.0
+ */
+ @Experimental
+ def writeStream: DataStreamWriter[T] = {
+ if (!isStreaming) {
+ logicalPlan.failAnalysis(
+ "'writeStream' can be called only on streaming Dataset/DataFrame")
+ }
+ new DataStreamWriter[T](this)
+ }
+
/**
* Returns the content of the Dataset as a Dataset of JSON strings.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 58b4e6c5f6..33f62915df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.streaming.ContinuousQueryManager
+import org.apache.spark.sql.streaming.{ContinuousQueryManager, DataStreamReader}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -491,7 +491,8 @@ class SQLContext private[sql](val sparkSession: SparkSession)
}
/**
- * Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]].
+ * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
+ * [[DataFrame]].
* {{{
* sqlContext.read.parquet("/path/to/file.parquet")
* sqlContext.read.schema(schema).json("/path/to/file.json")
@@ -502,6 +503,21 @@ class SQLContext private[sql](val sparkSession: SparkSession)
*/
def read: DataFrameReader = sparkSession.read
+
+ /**
+ * :: Experimental ::
+ * Returns a [[DataStreamReader]] that can be used to read streaming data in as a [[DataFrame]].
+ * {{{
+ * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
+ * sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def readStream: DataStreamReader = sparkSession.readStream
+
+
/**
* Creates an external table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 01c2e3ad29..9137a735dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -574,7 +574,8 @@ class SparkSession private(
}
/**
- * Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]].
+ * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
+ * [[DataFrame]].
* {{{
* sparkSession.read.parquet("/path/to/file.parquet")
* sparkSession.read.schema(schema).json("/path/to/file.json")
@@ -584,6 +585,19 @@ class SparkSession private(
*/
def read: DataFrameReader = new DataFrameReader(self)
+ /**
+ * :: Experimental ::
+ * Returns a [[DataStreamReader]] that can be used to read streaming data in as a [[DataFrame]].
+ * {{{
+ * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
+ * sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def readStream: DataStreamReader = new DataStreamReader(self)
+
// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
new file mode 100644
index 0000000000..248247a257
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming [[Dataset]] from external storage systems (e.g. file systems,
+ * key-value stores, etc). Use [[SparkSession.readStream]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
+ /**
+ * :: Experimental ::
+ * Specifies the input data source format.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def format(source: String): DataStreamReader = {
+ this.source = source
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
+ * automatically from data. By specifying the schema here, the underlying data source can
+ * skip the schema inference step, and thus speed up data loading.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def schema(schema: StructType): DataStreamReader = {
+ this.userSpecifiedSchema = Option(schema)
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: String): DataStreamReader = {
+ this.extraOptions += (key -> value)
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
+
+ /**
+ * :: Experimental ::
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
+
+ /**
+ * :: Experimental ::
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific) Adds input options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def options(options: scala.collection.Map[String, String]): DataStreamReader = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds input options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def options(options: java.util.Map[String, String]): DataStreamReader = {
+ this.options(options.asScala)
+ this
+ }
+
+
+ /**
+ * :: Experimental ::
+ * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
+ * (e.g. external key-value stores).
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def load(): DataFrame = {
+ val dataSource =
+ DataSource(
+ sparkSession,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = source,
+ options = extraOptions.toMap)
+ Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
+ }
+
+ /**
+ * :: Experimental ::
+ * Loads input in as a [[DataFrame]], for data streams that read from some path.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def load(path: String): DataFrame = {
+ option("path", path).load()
+ }
+
+ /**
+ * :: Experimental ::
+ * Loads a JSON file stream (one object per line) and returns the result as a [[DataFrame]].
+ *
+ * This function goes through the input once to determine the input schema. If you know the
+ * schema in advance, use the version that specifies the schema to avoid the extra scan.
+ *
+ * You can set the following JSON-specific options to deal with non-standard JSON files:
+ * <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
+ * <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
+ * type. If the values do not fit in decimal, then it infers them as doubles.</li>
+ * <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
+ * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
+ * <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
+ * </li>
+ * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
+ * (e.g. 00012)</li>
+ * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
+ * character using backslash quoting mechanism</li>
+ * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
+ * during parsing.</li>
+ * <ul>
+ * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
+ * malformed string into a new field configured by `columnNameOfCorruptRecord`. When
+ * a schema is set by user, it sets `null` for extra fields.</li>
+ * <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+ * <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
+ * </ul>
+ * <li>`columnNameOfCorruptRecord` (default is the value specified in
+ * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
+ * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def json(path: String): DataFrame = format("json").load(path)
+
+ /**
+ * :: Experimental ::
+ * Loads a CSV file stream and returns the result as a [[DataFrame]].
+ *
+ * This function will go through the input once to determine the input schema if `inferSchema`
+ * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
+ * specify the schema explicitly using [[schema]].
+ *
+ * You can set the following CSV-specific options to deal with CSV files:
+ * <li>`sep` (default `,`): sets the single character as a separator for each
+ * field and value.</li>
+ * <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding
+ * type.</li>
+ * <li>`quote` (default `"`): sets the single character used for escaping quoted values where
+ * the separator can be part of the value. If you would like to turn off quotations, you need to
+ * set not `null` but an empty string. This behaviour is different form
+ * `com.databricks.spark.csv`.</li>
+ * <li>`escape` (default `\`): sets the single character used for escaping quotes inside
+ * an already quoted value.</li>
+ * <li>`comment` (default empty string): sets the single character used for skipping lines
+ * beginning with this character. By default, it is disabled.</li>
+ * <li>`header` (default `false`): uses the first line as names of columns.</li>
+ * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It
+ * requires one extra pass over the data.</li>
+ * <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces
+ * from values being read should be skipped.</li>
+ * <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
+ * whitespaces from values being read should be skipped.</li>
+ * <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
+ * <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
+ * <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
+ * value.</li>
+ * <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
+ * value.</li>
+ * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
+ * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
+ * and timestamp type. By default, it is `null` which means trying to parse times and date by
+ * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+ * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
+ * a record can have.</li>
+ * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
+ * for any given value being read.</li>
+ * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
+ * during parsing.</li>
+ * <ul>
+ * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When
+ * a schema is set by user, it sets `null` for extra fields.</li>
+ * <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+ * <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def csv(path: String): DataFrame = format("csv").load(path)
+
+ /**
+ * :: Experimental ::
+ * Loads a Parquet file stream, returning the result as a [[DataFrame]].
+ *
+ * You can set the following Parquet-specific option(s) for reading Parquet files:
+ * <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
+ * whether we should merge schemas collected from all Parquet part-files. This will override
+ * `spark.sql.parquet.mergeSchema`.</li>
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def parquet(path: String): DataFrame = {
+ format("parquet").load(path)
+ }
+
+ /**
+ * :: Experimental ::
+ * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
+ * "value", and followed by partitioned columns if there are any.
+ *
+ * Each line in the text files is a new row in the resulting DataFrame. For example:
+ * {{{
+ * // Scala:
+ * spark.readStream.text("/path/to/directory/")
+ *
+ * // Java:
+ * spark.readStream().text("/path/to/directory/")
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def text(path: String): DataFrame = format("text").load(path)
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
+
+ private var userSpecifiedSchema: Option[StructType] = None
+
+ private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
new file mode 100644
index 0000000000..b035ff7938
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.writeStream]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+ private val df = ds.toDF()
+
+ /**
+ * :: Experimental ::
+ * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+ * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
+ * written to the sink
+ * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
+ * to the sink every time these is some updates
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+ this.outputMode = outputMode
+ this
+ }
+
+
+ /**
+ * :: Experimental ::
+ * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+ * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
+ * the sink
+ * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
+ * every time these is some updates
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def outputMode(outputMode: String): DataStreamWriter[T] = {
+ this.outputMode = outputMode.toLowerCase match {
+ case "append" =>
+ OutputMode.Append
+ case "complete" =>
+ OutputMode.Complete
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
+ "Accepted output modes are 'append' and 'complete'")
+ }
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
+ * the query as fast as possible.
+ *
+ * Scala Example:
+ * {{{
+ * df.writeStream.trigger(ProcessingTime("10 seconds"))
+ *
+ * import scala.concurrent.duration._
+ * df.writeStream.trigger(ProcessingTime(10.seconds))
+ * }}}
+ *
+ * Java Example:
+ * {{{
+ * df.writeStream().trigger(ProcessingTime.create("10 seconds"))
+ *
+ * import java.util.concurrent.TimeUnit
+ * df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def trigger(trigger: Trigger): DataStreamWriter[T] = {
+ this.trigger = trigger
+ this
+ }
+
+
+ /**
+ * :: Experimental ::
+ * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`.
+ * This name must be unique among all the currently active queries in the associated SQLContext.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def queryName(queryName: String): DataStreamWriter[T] = {
+ this.extraOptions += ("queryName" -> queryName)
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def format(source: String): DataStreamWriter[T] = {
+ this.source = source
+ this
+ }
+
+ /**
+ * Partitions the output by the given columns on the file system. If specified, the output is
+ * laid out on the file system similar to Hive's partitioning scheme. As an example, when we
+ * partition a dataset by year and then month, the directory layout would look like:
+ *
+ * - year=2016/month=01/
+ * - year=2016/month=02/
+ *
+ * Partitioning is one of the most widely used techniques to optimize physical data layout.
+ * It provides a coarse-grained index for skipping unnecessary data reads when queries have
+ * predicates on the partitioned columns. In order for partitioning to work well, the number
+ * of distinct values in each column should typically be less than tens of thousands.
+ *
+ * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
+ *
+ * @since 1.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colNames: String*): DataStreamWriter[T] = {
+ this.partitioningColumns = Option(colNames)
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds an output option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: String): DataStreamWriter[T] = {
+ this.extraOptions += (key -> value)
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds an output option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)
+
+ /**
+ * :: Experimental ::
+ * Adds an output option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)
+
+ /**
+ * :: Experimental ::
+ * Adds an output option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific) Adds output options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds output options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
+ this.options(options.asScala)
+ this
+ }
+
+ /**
+ * :: Experimental ::
+ * Starts the execution of the streaming query, which will continually output results to the given
+ * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * the stream.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def start(path: String): ContinuousQuery = {
+ option("path", path).start()
+ }
+
+ /**
+ * :: Experimental ::
+ * Starts the execution of the streaming query, which will continually output results to the given
+ * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * the stream.
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def start(): ContinuousQuery = {
+ if (source == "memory") {
+ assertNotPartitioned("memory")
+ if (extraOptions.get("queryName").isEmpty) {
+ throw new AnalysisException("queryName must be specified for memory sink")
+ }
+
+ val sink = new MemorySink(df.schema, outputMode)
+ val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
+ val query = df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
+ df,
+ sink,
+ outputMode,
+ useTempCheckpointLocation = true,
+ recoverFromCheckpointLocation = false,
+ trigger = trigger)
+ resultDf.createOrReplaceTempView(query.name)
+ query
+ } else if (source == "foreach") {
+ assertNotPartitioned("foreach")
+ val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
+ df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
+ df,
+ sink,
+ outputMode,
+ useTempCheckpointLocation = true,
+ trigger = trigger)
+ } else {
+ val dataSource =
+ DataSource(
+ df.sparkSession,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
+ df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
+ df,
+ dataSource.createSink(outputMode),
+ outputMode,
+ trigger = trigger)
+ }
+ }
+
+ /**
+ * :: Experimental ::
+ * Starts the execution of the streaming query, which will continually send results to the given
+ * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
+ * generated by the [[DataFrame]]/[[Dataset]] to an external system.
+ *
+ * Scala example:
+ * {{{
+ * datasetOfString.writeStream.foreach(new ForeachWriter[String] {
+ *
+ * def open(partitionId: Long, version: Long): Boolean = {
+ * // open connection
+ * }
+ *
+ * def process(record: String) = {
+ * // write string to connection
+ * }
+ *
+ * def close(errorOrNull: Throwable): Unit = {
+ * // close the connection
+ * }
+ * }).start()
+ * }}}
+ *
+ * Java example:
+ * {{{
+ * datasetOfString.writeStream().foreach(new ForeachWriter<String>() {
+ *
+ * @Override
+ * public boolean open(long partitionId, long version) {
+ * // open connection
+ * }
+ *
+ * @Override
+ * public void process(String value) {
+ * // write string to connection
+ * }
+ *
+ * @Override
+ * public void close(Throwable errorOrNull) {
+ * // close the connection
+ * }
+ * }).start();
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+ this.source = "foreach"
+ this.foreachWriter = if (writer != null) {
+ ds.sparkSession.sparkContext.clean(writer)
+ } else {
+ throw new IllegalArgumentException("foreach writer cannot be null")
+ }
+ this
+ }
+
+ private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
+ cols.map(normalize(_, "Partition"))
+ }
+
+ /**
+ * The given column name may not be equal to any of the existing column names if we were in
+ * case-insensitive context. Normalize the given column name to the real one so that we don't
+ * need to care about case sensitivity afterwards.
+ */
+ private def normalize(columnName: String, columnType: String): String = {
+ val validColumnNames = df.logicalPlan.output.map(_.name)
+ validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName))
+ .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
+ s"existing columns (${validColumnNames.mkString(", ")})"))
+ }
+
+ private def assertNotPartitioned(operation: String): Unit = {
+ if (partitioningColumns.isDefined) {
+ throw new AnalysisException(s"'$operation' does not support partitioning")
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName
+
+ private var outputMode: OutputMode = OutputMode.Append
+
+ private var trigger: Trigger = ProcessingTime(0L)
+
+ private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+ private var foreachWriter: ForeachWriter[T] = null
+
+ private var partitioningColumns: Option[Seq[String]] = None
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index e1fb3b9478..6ff597c16b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -38,9 +38,10 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
test("foreach") {
withTempDir { checkpointDir =>
val input = MemoryStream[Int]
- val query = input.toDS().repartition(2).write
+ val query = input.toDS().repartition(2).writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreach(new TestForeachWriter())
+ .start()
input.addData(1, 2, 3, 4)
query.processAllAvailable()
@@ -70,14 +71,14 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
test("foreach with error") {
withTempDir { checkpointDir =>
val input = MemoryStream[Int]
- val query = input.toDS().repartition(1).write
+ val query = input.toDS().repartition(1).writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreach(new TestForeachWriter() {
override def process(value: Int): Unit = {
super.process(value)
throw new RuntimeException("error")
}
- })
+ }).start()
input.addData(1, 2, 3, 4)
query.processAllAvailable()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index f81608bdb8..ef2fcbf73e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -225,12 +225,12 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
val metadataRoot =
Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
query =
- df.write
+ df.writeStream
.format("memory")
.queryName(s"query$i")
.option("checkpointLocation", metadataRoot)
.outputMode("append")
- .startStream()
+ .start()
.asInstanceOf[StreamExecution]
} catch {
case NonFatal(e) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 43a88576cf..ad6bc27729 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -39,12 +39,12 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
def startQuery(queryName: String): ContinuousQuery = {
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
- val writer = mapped.write
+ val writer = mapped.writeStream
writer
.queryName(queryName)
.format("memory")
.option("checkpointLocation", metadataRoot)
- .startStream()
+ .start()
}
val q1 = startQuery("q1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index bb3063dc34..a5acc970e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -128,10 +128,10 @@ class FileStreamSinkSuite extends StreamTest {
try {
query =
- df.write
- .format("parquet")
+ df.writeStream
.option("checkpointLocation", checkpointDir)
- .startStream(outputDir)
+ .format("parquet")
+ .start(outputDir)
inputData.addData(1, 2, 3)
@@ -162,11 +162,11 @@ class FileStreamSinkSuite extends StreamTest {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
- .write
- .format("parquet")
+ .writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
- .startStream(outputDir)
+ .format("parquet")
+ .start(outputDir)
inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
@@ -246,13 +246,13 @@ class FileStreamSinkSuite extends StreamTest {
val writer =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
- .write
+ .writeStream
if (format.nonEmpty) {
writer.format(format.get)
}
query = writer
.option("checkpointLocation", checkpointDir)
- .startStream(outputDir)
+ .start(outputDir)
} finally {
if (query != null) {
query.stop()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f681b8878d..6971f93b23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -107,11 +107,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
schema: Option[StructType] = None): DataFrame = {
val reader =
if (schema.isDefined) {
- spark.read.format(format).schema(schema.get)
+ spark.readStream.format(format).schema(schema.get)
} else {
- spark.read.format(format)
+ spark.readStream.format(format)
}
- reader.stream(path)
+ reader.load(path)
}
protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
@@ -153,14 +153,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
format: Option[String],
path: Option[String],
schema: Option[StructType] = None): StructType = {
- val reader = spark.read
+ val reader = spark.readStream
format.foreach(reader.format)
schema.foreach(reader.schema)
val df =
if (path.isDefined) {
- reader.stream(path.get)
+ reader.load(path.get)
} else {
- reader.stream()
+ reader.load()
}
df.queryExecution.analyzed
.collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 1c0fb34dd0..0e157cf726 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -98,7 +98,7 @@ class FileStressSuite extends StreamTest {
}
writer.start()
- val input = spark.read.format("text").stream(inputDir)
+ val input = spark.readStream.format("text").load(inputDir)
def startStream(): ContinuousQuery = {
val output = input
@@ -116,17 +116,17 @@ class FileStressSuite extends StreamTest {
if (partitionWrites) {
output
- .write
+ .writeStream
.partitionBy("id")
.format("parquet")
.option("checkpointLocation", checkpoint)
- .startStream(outputDir)
+ .start(outputDir)
} else {
output
- .write
+ .writeStream
.format("parquet")
.option("checkpointLocation", checkpoint)
- .startStream(outputDir)
+ .start(outputDir)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 9aada0b18d..310d756302 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -140,11 +140,11 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
test("registering as a table in Append output mode") {
val input = MemoryStream[Int]
- val query = input.toDF().write
+ val query = input.toDF().writeStream
.format("memory")
.outputMode("append")
.queryName("memStream")
- .startStream()
+ .start()
input.addData(1, 2, 3)
query.processAllAvailable()
@@ -166,11 +166,11 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
val query = input.toDF()
.groupBy("value")
.count()
- .write
+ .writeStream
.format("memory")
.outputMode("complete")
.queryName("memStream")
- .startStream()
+ .start()
input.addData(1, 2, 3)
query.processAllAvailable()
@@ -191,10 +191,10 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
// Ignore the stress test as it takes several minutes to run
(0 until 1000).foreach { _ =>
val input = MemoryStream[Int]
- val query = input.toDF().write
+ val query = input.toDF().writeStream
.format("memory")
.queryName("memStream")
- .startStream()
+ .start()
input.addData(1, 2, 3)
query.processAllAvailable()
@@ -215,9 +215,9 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
test("error when no name is specified") {
val error = intercept[AnalysisException] {
val input = MemoryStream[Int]
- val query = input.toDF().write
+ val query = input.toDF().writeStream
.format("memory")
- .startStream()
+ .start()
}
assert(error.message contains "queryName must be specified")
@@ -227,21 +227,21 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
val input = MemoryStream[Int]
- val query = input.toDF().write
+ val query = input.toDF().writeStream
.format("memory")
.queryName("memStream")
.option("checkpointLocation", location)
- .startStream()
+ .start()
input.addData(1, 2, 3)
query.processAllAvailable()
query.stop()
intercept[AnalysisException] {
- input.toDF().write
+ input.toDF().writeStream
.format("memory")
.queryName("memStream")
.option("checkpointLocation", location)
- .startStream()
+ .start()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 9414b1ce40..786404a589 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -89,9 +89,9 @@ class StreamSuite extends StreamTest {
def assertDF(df: DataFrame) {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
- val query = df.write.format("parquet")
+ val query = df.writeStream.format("parquet")
.option("checkpointLocation", checkpointDir.getAbsolutePath)
- .startStream(outputDir.getAbsolutePath)
+ .start(outputDir.getAbsolutePath)
try {
query.processAllAvailable()
val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
@@ -103,7 +103,7 @@ class StreamSuite extends StreamTest {
}
}
- val df = spark.read.format(classOf[FakeDefaultSource].getName).stream()
+ val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
assertDF(df)
assertDF(df)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 8681199817..7f44227ec4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
import testImplicits._
+
+
test("simple count, update mode") {
val inputData = MemoryStream[Int]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 6e0d66ae7f..c6d374f754 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -101,7 +101,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}
}
-class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
+class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
@@ -110,25 +110,38 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
spark.streams.active.foreach(_.stop())
}
+ test("write cannot be called on streaming datasets") {
+ val e = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+ .write
+ .save()
+ }
+ Seq("'write'", "not", "streaming Dataset/DataFrame").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+ }
+
test("resolve default source") {
- spark.read
+ spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
- .write
+ .load()
+ .writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .startStream()
+ .start()
.stop()
}
test("resolve full class") {
- spark.read
+ spark.readStream
.format("org.apache.spark.sql.streaming.test.DefaultSource")
- .stream()
- .write
+ .load()
+ .writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .startStream()
+ .start()
.stop()
}
@@ -136,12 +149,12 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
val map = new java.util.HashMap[String, String]
map.put("opt3", "3")
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "1")
.options(Map("opt2" -> "2"))
.options(map)
- .stream()
+ .load()
assert(LastOptions.parameters("opt1") == "1")
assert(LastOptions.parameters("opt2") == "2")
@@ -149,13 +162,13 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
LastOptions.clear()
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "1")
.options(Map("opt2" -> "2"))
.options(map)
.option("checkpointLocation", newMetadataDir)
- .startStream()
+ .start()
.stop()
assert(LastOptions.parameters("opt1") == "1")
@@ -164,84 +177,84 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
test("partitioning") {
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
+ .load()
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .startStream()
+ .start()
.stop()
assert(LastOptions.partitionColumns == Nil)
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.partitionBy("a")
- .startStream()
+ .start()
.stop()
assert(LastOptions.partitionColumns == Seq("a"))
withSQLConf("spark.sql.caseSensitive" -> "false") {
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.partitionBy("A")
- .startStream()
+ .start()
.stop()
assert(LastOptions.partitionColumns == Seq("a"))
}
intercept[AnalysisException] {
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.partitionBy("b")
- .startStream()
+ .start()
.stop()
}
}
test("stream paths") {
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .stream("/test")
+ .load("/test")
assert(LastOptions.parameters("path") == "/test")
LastOptions.clear()
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .startStream("/test")
+ .start("/test")
.stop()
assert(LastOptions.parameters("path") == "/test")
}
test("test different data types for options") {
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("intOpt", 56)
.option("boolOpt", false)
.option("doubleOpt", 6.7)
- .stream("/test")
+ .load("/test")
assert(LastOptions.parameters("intOpt") == "56")
assert(LastOptions.parameters("boolOpt") == "false")
assert(LastOptions.parameters("doubleOpt") == "6.7")
LastOptions.clear()
- df.write
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("intOpt", 56)
.option("boolOpt", false)
.option("doubleOpt", 6.7)
.option("checkpointLocation", newMetadataDir)
- .startStream("/test")
+ .start("/test")
.stop()
assert(LastOptions.parameters("intOpt") == "56")
@@ -253,25 +266,25 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
/** Start a query with a specific name */
def startQueryWithName(name: String = ""): ContinuousQuery = {
- spark.read
+ spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream("/test")
- .write
+ .load("/test")
+ .writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.queryName(name)
- .startStream()
+ .start()
}
/** Start a query without specifying a name */
def startQueryWithoutName(): ContinuousQuery = {
- spark.read
+ spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream("/test")
- .write
+ .load("/test")
+ .writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
- .startStream()
+ .start()
}
/** Get the names of active streams */
@@ -311,24 +324,24 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
test("trigger") {
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream("/test")
+ .load("/test")
- var q = df.write
+ var q = df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime(10.seconds))
- .startStream()
+ .start()
q.stop()
assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000))
- q = df.write
+ q = df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
- .startStream()
+ .start()
q.stop()
assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
@@ -339,19 +352,19 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
val checkpointLocation = newMetadataDir
- val df1 = spark.read
+ val df1 = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
+ .load()
- val df2 = spark.read
+ val df2 = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
+ .load()
- val q = df1.union(df2).write
+ val q = df1.union(df2).writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", checkpointLocation)
.trigger(ProcessingTime(10.seconds))
- .startStream()
+ .start()
q.stop()
verify(LastOptions.mockStreamSourceProvider).createSource(
@@ -371,76 +384,12 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
- test("check trigger() can only be called on continuous queries") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 seconds")))
- assert(e.getMessage == "trigger() can only be called on continuous queries;")
- }
-
- test("check queryName() can only be called on continuous queries") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val e = intercept[AnalysisException](w.queryName("queryName"))
- assert(e.getMessage == "queryName() can only be called on continuous queries;")
- }
-
- test("check startStream() can only be called on continuous queries") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val e = intercept[AnalysisException](w.startStream())
- assert(e.getMessage == "startStream() can only be called on continuous queries;")
- }
-
- test("check startStream(path) can only be called on continuous queries") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val e = intercept[AnalysisException](w.startStream("non_exist_path"))
- assert(e.getMessage == "startStream() can only be called on continuous queries;")
- }
-
- test("check mode(SaveMode) can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.mode(SaveMode.Append))
- assert(e.getMessage == "mode() can only be called on non-continuous queries;")
- }
-
- test("check mode(string) can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.mode("append"))
- assert(e.getMessage == "mode() can only be called on non-continuous queries;")
- }
-
- test("check outputMode(OutputMode) can only be called on continuous queries") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val e = intercept[AnalysisException](w.outputMode(OutputMode.Append))
- Seq("outputmode", "continuous queries").foreach { s =>
- assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
- }
- }
-
- test("check outputMode(string) can only be called on continuous queries") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val e = intercept[AnalysisException](w.outputMode("append"))
- Seq("outputmode", "continuous queries").foreach { s =>
- assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
- }
- }
-
test("check outputMode(string) throws exception on unsupported modes") {
def testError(outputMode: String): Unit = {
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
+ .load()
+ val w = df.writeStream
val e = intercept[IllegalArgumentException](w.outputMode(outputMode))
Seq("output mode", "unknown", outputMode).foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
@@ -450,159 +399,46 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
testError("Xyz")
}
- test("check bucketBy() can only be called on non-continuous queries") {
- val df = spark.read
+ test("check foreach() catches null writers") {
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream())
- assert(e.getMessage == "'startStream' does not support bucketing right now;")
- }
-
- test("check sortBy() can only be called on non-continuous queries;") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.sortBy("text").startStream())
- assert(e.getMessage == "'startStream' does not support bucketing right now;")
- }
+ .load()
- test("check save(path) can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.save("non_exist_path"))
- assert(e.getMessage == "save() can only be called on non-continuous queries;")
- }
-
- test("check save() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.save())
- assert(e.getMessage == "save() can only be called on non-continuous queries;")
- }
-
- test("check insertInto() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.insertInto("non_exsit_table"))
- assert(e.getMessage == "insertInto() can only be called on non-continuous queries;")
- }
-
- test("check saveAsTable() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table"))
- assert(e.getMessage == "saveAsTable() can only be called on non-continuous queries;")
- }
-
- test("check jdbc() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.jdbc(null, null, null))
- assert(e.getMessage == "jdbc() can only be called on non-continuous queries;")
- }
-
- test("check json() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.json("non_exist_path"))
- assert(e.getMessage == "json() can only be called on non-continuous queries;")
- }
-
- test("check parquet() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.parquet("non_exist_path"))
- assert(e.getMessage == "parquet() can only be called on non-continuous queries;")
- }
-
- test("check orc() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.orc("non_exist_path"))
- assert(e.getMessage == "orc() can only be called on non-continuous queries;")
- }
-
- test("check text() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.text("non_exist_path"))
- assert(e.getMessage == "text() can only be called on non-continuous queries;")
- }
-
- test("check csv() can only be called on non-continuous queries") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- val w = df.write
- val e = intercept[AnalysisException](w.csv("non_exist_path"))
- assert(e.getMessage == "csv() can only be called on non-continuous queries;")
- }
-
- test("check foreach() does not support partitioning or bucketing") {
- val df = spark.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
-
- var w = df.write.partitionBy("value")
- var e = intercept[AnalysisException](w.foreach(null))
- Seq("foreach", "partitioning").foreach { s =>
- assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
- }
-
- w = df.write.bucketBy(2, "value")
- e = intercept[AnalysisException](w.foreach(null))
- Seq("foreach", "bucketing").foreach { s =>
+ var w = df.writeStream
+ var e = intercept[IllegalArgumentException](w.foreach(null))
+ Seq("foreach", "null").foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
}
}
- test("check jdbc() does not support partitioning or bucketing") {
- val df = spark.read.text(newTextInput)
- var w = df.write.partitionBy("value")
- var e = intercept[AnalysisException](w.jdbc(null, null, null))
- Seq("jdbc", "partitioning").foreach { s =>
- assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ test("check foreach() does not support partitioning") {
+ val df = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+ val foreachWriter = new ForeachWriter[Row] {
+ override def open(partitionId: Long, version: Long): Boolean = false
+ override def process(value: Row): Unit = {}
+ override def close(errorOrNull: Throwable): Unit = {}
}
-
- w = df.write.bucketBy(2, "value")
- e = intercept[AnalysisException](w.jdbc(null, null, null))
- Seq("jdbc", "bucketing").foreach { s =>
+ var w = df.writeStream.partitionBy("value")
+ var e = intercept[AnalysisException](w.foreach(foreachWriter).start())
+ Seq("foreach", "partitioning").foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
}
}
test("ConsoleSink can be correctly loaded") {
LastOptions.clear()
- val df = spark.read
+ val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
- .stream()
+ .load()
- val cq = df.write
+ val cq = df.writeStream
.format("console")
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime(2.seconds))
- .startStream()
+ .start()
cq.awaitTermination(2000L)
}
@@ -611,10 +447,11 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
withTempDir { dir =>
val path = dir.getCanonicalPath
intercept[AnalysisException] {
- spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
- }
- intercept[AnalysisException] {
- spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+ spark.range(10).writeStream
+ .outputMode("append")
+ .partitionBy("id")
+ .format("parquet")
+ .start(path)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
new file mode 100644
index 0000000000..98e57b3804
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.test
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+object LastOptions {
+
+ var parameters: Map[String, String] = null
+ var schema: Option[StructType] = null
+ var saveMode: SaveMode = null
+
+ def clear(): Unit = {
+ parameters = null
+ schema = null
+ saveMode = null
+ }
+}
+
+
+/** Dummy provider. */
+class DefaultSource
+ extends RelationProvider
+ with SchemaRelationProvider
+ with CreatableRelationProvider {
+
+ case class FakeRelation(sqlContext: SQLContext) extends BaseRelation {
+ override def schema: StructType = StructType(Seq(StructField("a", StringType)))
+ }
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: StructType
+ ): BaseRelation = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = Some(schema)
+ FakeRelation(sqlContext)
+ }
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String]
+ ): BaseRelation = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = None
+ FakeRelation(sqlContext)
+ }
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = None
+ LastOptions.saveMode = mode
+ FakeRelation(sqlContext)
+ }
+}
+
+
+class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext {
+
+ private def newMetadataDir =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+ test("writeStream cannot be called on non-streaming datasets") {
+ val e = intercept[AnalysisException] {
+ spark.read
+ .format("org.apache.spark.sql.test")
+ .load()
+ .writeStream
+ .start()
+ }
+ Seq("'writeStream'", "only", "streaming Dataset/DataFrame").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+ }
+
+
+ test("resolve default source") {
+ spark.read
+ .format("org.apache.spark.sql.test")
+ .load()
+ .write
+ .format("org.apache.spark.sql.test")
+ .save()
+ }
+
+ test("resolve full class") {
+ spark.read
+ .format("org.apache.spark.sql.test.DefaultSource")
+ .load()
+ .write
+ .format("org.apache.spark.sql.test")
+ .save()
+ }
+
+ test("options") {
+ val map = new java.util.HashMap[String, String]
+ map.put("opt3", "3")
+
+ val df = spark.read
+ .format("org.apache.spark.sql.test")
+ .option("opt1", "1")
+ .options(Map("opt2" -> "2"))
+ .options(map)
+ .load()
+
+ assert(LastOptions.parameters("opt1") == "1")
+ assert(LastOptions.parameters("opt2") == "2")
+ assert(LastOptions.parameters("opt3") == "3")
+
+ LastOptions.clear()
+
+ df.write
+ .format("org.apache.spark.sql.test")
+ .option("opt1", "1")
+ .options(Map("opt2" -> "2"))
+ .options(map)
+ .save()
+
+ assert(LastOptions.parameters("opt1") == "1")
+ assert(LastOptions.parameters("opt2") == "2")
+ assert(LastOptions.parameters("opt3") == "3")
+ }
+
+ test("save mode") {
+ val df = spark.read
+ .format("org.apache.spark.sql.test")
+ .load()
+
+ df.write
+ .format("org.apache.spark.sql.test")
+ .mode(SaveMode.ErrorIfExists)
+ .save()
+ assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
+ }
+
+ test("paths") {
+ val df = spark.read
+ .format("org.apache.spark.sql.test")
+ .option("checkpointLocation", newMetadataDir)
+ .load("/test")
+
+ assert(LastOptions.parameters("path") == "/test")
+
+ LastOptions.clear()
+
+ df.write
+ .format("org.apache.spark.sql.test")
+ .option("checkpointLocation", newMetadataDir)
+ .save("/test")
+
+ assert(LastOptions.parameters("path") == "/test")
+ }
+
+ test("test different data types for options") {
+ val df = spark.read
+ .format("org.apache.spark.sql.test")
+ .option("intOpt", 56)
+ .option("boolOpt", false)
+ .option("doubleOpt", 6.7)
+ .load("/test")
+
+ assert(LastOptions.parameters("intOpt") == "56")
+ assert(LastOptions.parameters("boolOpt") == "false")
+ assert(LastOptions.parameters("doubleOpt") == "6.7")
+
+ LastOptions.clear()
+ df.write
+ .format("org.apache.spark.sql.test")
+ .option("intOpt", 56)
+ .option("boolOpt", false)
+ .option("doubleOpt", 6.7)
+ .option("checkpointLocation", newMetadataDir)
+ .save("/test")
+
+ assert(LastOptions.parameters("intOpt") == "56")
+ assert(LastOptions.parameters("boolOpt") == "false")
+ assert(LastOptions.parameters("doubleOpt") == "6.7")
+ }
+
+ test("check jdbc() does not support partitioning or bucketing") {
+ val df = spark.read.text(Utils.createTempDir(namePrefix = "text").getCanonicalPath)
+
+ var w = df.write.partitionBy("value")
+ var e = intercept[AnalysisException](w.jdbc(null, null, null))
+ Seq("jdbc", "partitioning").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+
+ w = df.write.bucketBy(2, "value")
+ e = intercept[AnalysisException](w.jdbc(null, null, null))
+ Seq("jdbc", "bucketing").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+ }
+
+ test("prevent all column partitioning") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ intercept[AnalysisException] {
+ spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
+ }
+ intercept[AnalysisException] {
+ spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+ }
+ }
+ }
+}