From 79accf45ace5549caa0cbab02f94fc87bedb5587 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 6 Oct 2016 10:33:45 -0700 Subject: [SPARK-17798][SQL] Remove redundant Experimental annotations in sql.streaming ## What changes were proposed in this pull request? I was looking through API annotations to catch mislabeled APIs, and realized DataStreamReader and DataStreamWriter classes are already annotated as Experimental, and as a result there is no need to annotate each method within them. ## How was this patch tested? N/A Author: Reynold Xin Closes #15373 from rxin/SPARK-17798. --- .../spark/sql/streaming/DataStreamReader.scala | 28 --------------------- .../spark/sql/streaming/DataStreamWriter.scala | 29 ---------------------- .../sql/streaming/StreamingQueryListener.scala | 4 +-- 3 files changed, 1 insertion(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index d437c16a25..864a9cd3eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -35,89 +35,73 @@ import org.apache.spark.sql.types.StructType @Experimental final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging { /** - * :: Experimental :: * Specifies the input data source format. * * @since 2.0.0 */ - @Experimental def format(source: String): DataStreamReader = { this.source = source this } /** - * :: Experimental :: * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema * automatically from data. By specifying the schema here, the underlying data source can * skip the schema inference step, and thus speed up data loading. * * @since 2.0.0 */ - @Experimental def schema(schema: StructType): DataStreamReader = { this.userSpecifiedSchema = Option(schema) this } /** - * :: Experimental :: * Adds an input option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: String): DataStreamReader = { this.extraOptions += (key -> value) this } /** - * :: Experimental :: * Adds an input option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString) /** - * :: Experimental :: * Adds an input option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: Long): DataStreamReader = option(key, value.toString) /** - * :: Experimental :: * Adds an input option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: Double): DataStreamReader = option(key, value.toString) /** - * :: Experimental :: * (Scala-specific) Adds input options for the underlying data source. * * @since 2.0.0 */ - @Experimental def options(options: scala.collection.Map[String, String]): DataStreamReader = { this.extraOptions ++= options this } /** - * :: Experimental :: * Adds input options for the underlying data source. * * @since 2.0.0 */ - @Experimental def options(options: java.util.Map[String, String]): DataStreamReader = { this.options(options.asScala) this @@ -125,13 +109,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo /** - * :: Experimental :: * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path * (e.g. external key-value stores). * * @since 2.0.0 */ - @Experimental def load(): DataFrame = { val dataSource = DataSource( @@ -143,18 +125,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo } /** - * :: Experimental :: * Loads input in as a [[DataFrame]], for data streams that read from some path. * * @since 2.0.0 */ - @Experimental def load(path: String): DataFrame = { option("path", path).load() } /** - * :: Experimental :: * Loads a JSON file stream (one object per line) and returns the result as a [[DataFrame]]. * * This function goes through the input once to determine the input schema. If you know the @@ -198,11 +177,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - @Experimental def json(path: String): DataFrame = format("json").load(path) /** - * :: Experimental :: * Loads a CSV file stream and returns the result as a [[DataFrame]]. * * This function will go through the input once to determine the input schema if `inferSchema` @@ -262,11 +239,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - @Experimental def csv(path: String): DataFrame = format("csv").load(path) /** - * :: Experimental :: * Loads a Parquet file stream, returning the result as a [[DataFrame]]. * * You can set the following Parquet-specific option(s) for reading Parquet files: @@ -281,13 +256,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - @Experimental def parquet(path: String): DataFrame = { format("parquet").load(path) } /** - * :: Experimental :: * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. * @@ -308,7 +281,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - @Experimental def text(path: String): DataFrame = format("text").load(path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index f70c7d08a6..b959444b49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -37,7 +37,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private val df = ds.toDF() /** - * :: Experimental :: * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be * written to the sink @@ -46,15 +45,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * * @since 2.0.0 */ - @Experimental def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { this.outputMode = outputMode this } - /** - * :: Experimental :: * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to * the sink @@ -63,7 +59,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * * @since 2.0.0 */ - @Experimental def outputMode(outputMode: String): DataStreamWriter[T] = { this.outputMode = outputMode.toLowerCase match { case "append" => @@ -78,7 +73,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } /** - * :: Experimental :: * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run * the query as fast as possible. * @@ -100,7 +94,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * * @since 2.0.0 */ - @Experimental def trigger(trigger: Trigger): DataStreamWriter[T] = { this.trigger = trigger this @@ -108,25 +101,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** - * :: Experimental :: * Specifies the name of the [[StreamingQuery]] that can be started with `start()`. * This name must be unique among all the currently active queries in the associated SQLContext. * * @since 2.0.0 */ - @Experimental def queryName(queryName: String): DataStreamWriter[T] = { this.extraOptions += ("queryName" -> queryName) this } /** - * :: Experimental :: * Specifies the underlying output data source. Built-in options include "parquet" for now. * * @since 2.0.0 */ - @Experimental def format(source: String): DataStreamWriter[T] = { this.source = source this @@ -156,90 +145,74 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } /** - * :: Experimental :: * Adds an output option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: String): DataStreamWriter[T] = { this.extraOptions += (key -> value) this } /** - * :: Experimental :: * Adds an output option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString) /** - * :: Experimental :: * Adds an output option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString) /** - * :: Experimental :: * Adds an output option for the underlying data source. * * @since 2.0.0 */ - @Experimental def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString) /** - * :: Experimental :: * (Scala-specific) Adds output options for the underlying data source. * * @since 2.0.0 */ - @Experimental def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = { this.extraOptions ++= options this } /** - * :: Experimental :: * Adds output options for the underlying data source. * * @since 2.0.0 */ - @Experimental def options(options: java.util.Map[String, String]): DataStreamWriter[T] = { this.options(options.asScala) this } /** - * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ - @Experimental def start(path: String): StreamingQuery = { option("path", path).start() } /** - * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ - @Experimental def start(): StreamingQuery = { if (source == "memory") { assertNotPartitioned("memory") @@ -297,7 +270,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } /** - * :: Experimental :: * Starts the execution of the streaming query, which will continually send results to the given * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data * generated by the [[DataFrame]]/[[Dataset]] to an external system. @@ -343,7 +315,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * * @since 2.0.0 */ - @Experimental def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = { this.source = "foreach" this.foreachWriter = if (writer != null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index db606abb8c..8a8855d85a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -35,7 +35,7 @@ abstract class StreamingQueryListener { /** * Called when a query is started. * @note This is called synchronously with - * [[org.apache.spark.sql.DataStreamWriter `DataStreamWriter.start()`]], + * [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]], * that is, `onQueryStart` will be called on all listeners before * `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please * don't block this method as it will block your query. @@ -101,8 +101,6 @@ object StreamingQueryListener { * @param queryInfo Information about the status of the query. * @param exception The exception message of the [[StreamingQuery]] if the query was terminated * with an exception. Otherwise, it will be `None`. - * @param stackTrace The stack trace of the exception if the query was terminated with an - * exception. It will be empty if there was no error. * @since 2.0.0 */ @Experimental -- cgit v1.2.3