aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-06 10:33:45 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-06 10:33:45 -0700
commit79accf45ace5549caa0cbab02f94fc87bedb5587 (patch)
tree1ec1b153faa309a144a90278366d0f70bbef4324
parent92b7e5728025b1bb6ed3aab5f1753c946a73568c (diff)
downloadspark-79accf45ace5549caa0cbab02f94fc87bedb5587.tar.gz
spark-79accf45ace5549caa0cbab02f94fc87bedb5587.tar.bz2
spark-79accf45ace5549caa0cbab02f94fc87bedb5587.zip
[SPARK-17798][SQL] Remove redundant Experimental annotations in sql.streaming
## What changes were proposed in this pull request? I was looking through API annotations to catch mislabeled APIs, and realized DataStreamReader and DataStreamWriter classes are already annotated as Experimental, and as a result there is no need to annotate each method within them. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #15373 from rxin/SPARK-17798.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala4
3 files changed, 1 insertions, 60 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index d437c16a25..864a9cd3eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -35,89 +35,73 @@ import org.apache.spark.sql.types.StructType
@Experimental
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
/**
- * :: Experimental ::
* Specifies the input data source format.
*
* @since 2.0.0
*/
- @Experimental
def format(source: String): DataStreamReader = {
this.source = source
this
}
/**
- * :: Experimental ::
* Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data source can
* skip the schema inference step, and thus speed up data loading.
*
* @since 2.0.0
*/
- @Experimental
def schema(schema: StructType): DataStreamReader = {
this.userSpecifiedSchema = Option(schema)
this
}
/**
- * :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: String): DataStreamReader = {
this.extraOptions += (key -> value)
this
}
/**
- * :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
/**
- * :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
/**
- * :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
/**
- * :: Experimental ::
* (Scala-specific) Adds input options for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def options(options: scala.collection.Map[String, String]): DataStreamReader = {
this.extraOptions ++= options
this
}
/**
- * :: Experimental ::
* Adds input options for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def options(options: java.util.Map[String, String]): DataStreamReader = {
this.options(options.asScala)
this
@@ -125,13 +109,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
/**
- * :: Experimental ::
* Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
- @Experimental
def load(): DataFrame = {
val dataSource =
DataSource(
@@ -143,18 +125,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}
/**
- * :: Experimental ::
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
- @Experimental
def load(path: String): DataFrame = {
option("path", path).load()
}
/**
- * :: Experimental ::
* Loads a JSON file stream (one object per line) and returns the result as a [[DataFrame]].
*
* This function goes through the input once to determine the input schema. If you know the
@@ -198,11 +177,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
- @Experimental
def json(path: String): DataFrame = format("json").load(path)
/**
- * :: Experimental ::
* Loads a CSV file stream and returns the result as a [[DataFrame]].
*
* This function will go through the input once to determine the input schema if `inferSchema`
@@ -262,11 +239,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
- @Experimental
def csv(path: String): DataFrame = format("csv").load(path)
/**
- * :: Experimental ::
* Loads a Parquet file stream, returning the result as a [[DataFrame]].
*
* You can set the following Parquet-specific option(s) for reading Parquet files:
@@ -281,13 +256,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
- @Experimental
def parquet(path: String): DataFrame = {
format("parquet").load(path)
}
/**
- * :: Experimental ::
* Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
@@ -308,7 +281,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
- @Experimental
def text(path: String): DataFrame = format("text").load(path)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index f70c7d08a6..b959444b49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -37,7 +37,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private val df = ds.toDF()
/**
- * :: Experimental ::
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
* written to the sink
@@ -46,15 +45,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
- @Experimental
def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
this.outputMode = outputMode
this
}
-
/**
- * :: Experimental ::
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
* the sink
@@ -63,7 +59,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
- @Experimental
def outputMode(outputMode: String): DataStreamWriter[T] = {
this.outputMode = outputMode.toLowerCase match {
case "append" =>
@@ -78,7 +73,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
- * :: Experimental ::
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
* the query as fast as possible.
*
@@ -100,7 +94,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
- @Experimental
def trigger(trigger: Trigger): DataStreamWriter[T] = {
this.trigger = trigger
this
@@ -108,25 +101,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
- * :: Experimental ::
* Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
*/
- @Experimental
def queryName(queryName: String): DataStreamWriter[T] = {
this.extraOptions += ("queryName" -> queryName)
this
}
/**
- * :: Experimental ::
* Specifies the underlying output data source. Built-in options include "parquet" for now.
*
* @since 2.0.0
*/
- @Experimental
def format(source: String): DataStreamWriter[T] = {
this.source = source
this
@@ -156,90 +145,74 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
- * :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: String): DataStreamWriter[T] = {
this.extraOptions += (key -> value)
this
}
/**
- * :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)
/**
- * :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)
/**
- * :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)
/**
- * :: Experimental ::
* (Scala-specific) Adds output options for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
this.extraOptions ++= options
this
}
/**
- * :: Experimental ::
* Adds output options for the underlying data source.
*
* @since 2.0.0
*/
- @Experimental
def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
this.options(options.asScala)
this
}
/**
- * :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
- @Experimental
def start(path: String): StreamingQuery = {
option("path", path).start()
}
/**
- * :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
- @Experimental
def start(): StreamingQuery = {
if (source == "memory") {
assertNotPartitioned("memory")
@@ -297,7 +270,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
- * :: Experimental ::
* Starts the execution of the streaming query, which will continually send results to the given
* [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
* generated by the [[DataFrame]]/[[Dataset]] to an external system.
@@ -343,7 +315,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
- @Experimental
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
this.source = "foreach"
this.foreachWriter = if (writer != null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index db606abb8c..8a8855d85a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -35,7 +35,7 @@ abstract class StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
- * [[org.apache.spark.sql.DataStreamWriter `DataStreamWriter.start()`]],
+ * [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
* that is, `onQueryStart` will be called on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
* don't block this method as it will block your query.
@@ -101,8 +101,6 @@ object StreamingQueryListener {
* @param queryInfo Information about the status of the query.
* @param exception The exception message of the [[StreamingQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
- * @param stackTrace The stack trace of the exception if the query was terminated with an
- * exception. It will be empty if there was no error.
* @since 2.0.0
*/
@Experimental