From 5a5b83c97bbab1d717dcc30b09aafb7c0ed85069 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 10 May 2016 21:54:32 -0700 Subject: [SPARK-15261][SQL] Remove experimental tag from DataFrameReader/Writer ## What changes were proposed in this pull request? This patch removes experimental tag from DataFrameReader and DataFrameWriter, and explicitly tags a few methods added for structured streaming as experimental. ## How was this patch tested? N/A Author: Reynold Xin Closes #13038 from rxin/SPARK-15261. --- python/pyspark/sql/readwriter.py | 14 +++++++++----- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 8 +++++--- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 14 +++++++++----- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index e2ee9db049..20250b431b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -50,8 +50,6 @@ class DataFrameReader(object): (e.g. file systems, key-value stores, etc). Use :func:`SQLContext.read` to access this. - ::Note: Experimental - .. versionadded:: 1.4 """ @@ -143,6 +141,8 @@ class DataFrameReader(object): def stream(self, path=None, format=None, schema=None, **options): """Loads a data stream from a data source and returns it as a :class`DataFrame`. + .. note:: Experimental. + :param path: optional string for file-system backed data sources. :param format: optional string for format of the data source. Default to 'parquet'. :param schema: optional :class:`StructType` for the input schema. @@ -462,8 +462,6 @@ class DataFrameWriter(object): (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.write` to access this. - ::Note: Experimental - .. versionadded:: 1.4 """ def __init__(self, df): @@ -540,7 +538,9 @@ class DataFrameWriter(object): def queryName(self, queryName): """Specifies the name of the :class:`ContinuousQuery` that can be started with :func:`startStream`. This name must be unique among all the currently active queries - in the associated SQLContext. + in the associated SQLContext + + .. note:: Experimental. :param queryName: unique name for the query @@ -557,6 +557,8 @@ class DataFrameWriter(object): """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. + .. note:: Experimental. + :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'. >>> # trigger the query for execution every 5 seconds @@ -614,6 +616,8 @@ class DataFrameWriter(object): If ``format`` is not specified, the default data source configured by ``spark.sql.sources.default`` will be used. + .. note:: Experimental. + :param path: the path in a Hadoop supported file system :param format: the format used to save diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 5bf696c1c3..15d09e3edd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -34,13 +34,11 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types.StructType /** - * :: Experimental :: - * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems, + * Interface used to load a [[Dataset]] from external storage systems (e.g. file systems, * key-value stores, etc) or data streams. Use [[SparkSession.read]] to access this. * * @since 1.4.0 */ -@Experimental class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** @@ -164,11 +162,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** + * :: Experimental :: * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path * (e.g. external key-value stores). * * @since 2.0.0 */ + @Experimental def stream(): DataFrame = { val dataSource = DataSource( @@ -180,10 +180,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** + * :: Experimental :: * Loads input in as a [[DataFrame]], for data streams that read from some path. * * @since 2.0.0 */ + @Experimental def stream(path: String): DataFrame = { option("path", path).stream() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6b1ccbec61..da9d25443e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -34,13 +34,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** - * :: Experimental :: - * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, - * key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this. + * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc) or data streams. Use [[Dataset.write]] to access this. * * @since 1.4.0 */ -@Experimental final class DataFrameWriter private[sql](df: DataFrame) { /** @@ -255,11 +253,13 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** + * :: Experimental :: * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. * This name must be unique among all the currently active queries in the associated SQLContext. * * @since 2.0.0 */ + @Experimental def queryName(queryName: String): DataFrameWriter = { assertStreaming("queryName() can only be called on continuous queries") this.extraOptions += ("queryName" -> queryName) @@ -267,25 +267,29 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** + * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ + @Experimental def startStream(path: String): ContinuousQuery = { option("path", path).startStream() } /** + * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ + @Experimental def startStream(): ContinuousQuery = { - assertNotBucketed + assertNotBucketed() assertStreaming("startStream() can only be called on continuous queries") if (source == "memory") { -- cgit v1.2.3