aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-10 21:54:32 -0700
committerReynold Xin <rxin@databricks.com>2016-05-10 21:54:32 -0700
commit5a5b83c97bbab1d717dcc30b09aafb7c0ed85069 (patch)
treef5e53a6249aa63f2cbbabed2021a28901619a5a2
parent61e0bdcff2ed57b22541fb3c03146d6eec2bb70f (diff)
downloadspark-5a5b83c97bbab1d717dcc30b09aafb7c0ed85069.tar.gz
spark-5a5b83c97bbab1d717dcc30b09aafb7c0ed85069.tar.bz2
spark-5a5b83c97bbab1d717dcc30b09aafb7c0ed85069.zip
[SPARK-15261][SQL] Remove experimental tag from DataFrameReader/Writer
## What changes were proposed in this pull request? This patch removes experimental tag from DataFrameReader and DataFrameWriter, and explicitly tags a few methods added for structured streaming as experimental. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #13038 from rxin/SPARK-15261.
-rw-r--r--python/pyspark/sql/readwriter.py14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala14
3 files changed, 23 insertions, 13 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index e2ee9db049..20250b431b 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -50,8 +50,6 @@ class DataFrameReader(object):
(e.g. file systems, key-value stores, etc). Use :func:`SQLContext.read`
to access this.
- ::Note: Experimental
-
.. versionadded:: 1.4
"""
@@ -143,6 +141,8 @@ class DataFrameReader(object):
def stream(self, path=None, format=None, schema=None, **options):
"""Loads a data stream from a data source and returns it as a :class`DataFrame`.
+ .. note:: Experimental.
+
:param path: optional string for file-system backed data sources.
:param format: optional string for format of the data source. Default to 'parquet'.
:param schema: optional :class:`StructType` for the input schema.
@@ -462,8 +462,6 @@ class DataFrameWriter(object):
(e.g. file systems, key-value stores, etc). Use :func:`DataFrame.write`
to access this.
- ::Note: Experimental
-
.. versionadded:: 1.4
"""
def __init__(self, df):
@@ -540,7 +538,9 @@ class DataFrameWriter(object):
def queryName(self, queryName):
"""Specifies the name of the :class:`ContinuousQuery` that can be started with
:func:`startStream`. This name must be unique among all the currently active queries
- in the associated SQLContext.
+ in the associated SQLContext
+
+ .. note:: Experimental.
:param queryName: unique name for the query
@@ -557,6 +557,8 @@ class DataFrameWriter(object):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
+ .. note:: Experimental.
+
:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
>>> # trigger the query for execution every 5 seconds
@@ -614,6 +616,8 @@ class DataFrameWriter(object):
If ``format`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
+ .. note:: Experimental.
+
:param path: the path in a Hadoop supported file system
:param format: the format used to save
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 5bf696c1c3..15d09e3edd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -34,13 +34,11 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
/**
- * :: Experimental ::
- * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
+ * Interface used to load a [[Dataset]] from external storage systems (e.g. file systems,
* key-value stores, etc) or data streams. Use [[SparkSession.read]] to access this.
*
* @since 1.4.0
*/
-@Experimental
class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
@@ -164,11 +162,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
/**
+ * :: Experimental ::
* Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
+ @Experimental
def stream(): DataFrame = {
val dataSource =
DataSource(
@@ -180,10 +180,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
/**
+ * :: Experimental ::
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
+ @Experimental
def stream(path: String): DataFrame = {
option("path", path).stream()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6b1ccbec61..da9d25443e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -34,13 +34,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
/**
- * :: Experimental ::
- * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
- * key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this.
+ * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
+ * key-value stores, etc) or data streams. Use [[Dataset.write]] to access this.
*
* @since 1.4.0
*/
-@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {
/**
@@ -255,11 +253,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
+ * :: Experimental ::
* Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
*/
+ @Experimental
def queryName(queryName: String): DataFrameWriter = {
assertStreaming("queryName() can only be called on continuous queries")
this.extraOptions += ("queryName" -> queryName)
@@ -267,25 +267,29 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
+ * :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
+ @Experimental
def startStream(path: String): ContinuousQuery = {
option("path", path).startStream()
}
/**
+ * :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
+ @Experimental
def startStream(): ContinuousQuery = {
- assertNotBucketed
+ assertNotBucketed()
assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {