diff options
Diffstat (limited to 'sql')
8 files changed, 86 insertions, 315 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6de17e5924..84203bbfef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1682,7 +1682,7 @@ class DataFrame private[sql]( /** * :: Experimental :: - * Interface for saving the content of the [[DataFrame]] out into external storage. + * Interface for saving the content of the [[DataFrame]] out into external storage or streams. * * @group output * @since 1.4.0 @@ -1691,14 +1691,6 @@ class DataFrame private[sql]( def write: DataFrameWriter = new DataFrameWriter(this) /** - * :: Experimental :: - * Interface for starting a streaming query that will continually output results to the specified - * external sink as new data arrives. - */ - @Experimental - def streamTo: DataStreamWriter = new DataStreamWriter(this) - - /** * Returns the content of the [[DataFrame]] as a RDD of JSON strings. * @group rdd * @since 1.3.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 2e0c6c7df9..a58643a5ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -29,17 +29,17 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{CatalystQl} import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.execution.datasources.json.JSONRelation import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation +import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types.StructType /** * :: Experimental :: * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems, - * key-value stores, etc). Use [[SQLContext.read]] to access this. + * key-value stores, etc) or data streams. Use [[SQLContext.read]] to access this. * * @since 1.4.0 */ @@ -137,6 +137,30 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** + * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path + * (e.g. external key-value stores). + * + * @since 2.0.0 + */ + def stream(): DataFrame = { + val resolved = ResolvedDataSource.createSource( + sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + providerName = source, + options = extraOptions.toMap) + DataFrame(sqlContext, StreamingRelation(resolved)) + } + + /** + * Loads input in as a [[DataFrame]], for data streams that read from some path. + * + * @since 2.0.0 + */ + def stream(path: String): DataFrame = { + option("path", path).stream() + } + + /** * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. * @@ -165,7 +189,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @param connectionProperties JDBC database connection arguments, a list of arbitrary string * tag/value. Normally at least a "user" and "password" property * should be included. - * * @since 1.4.0 */ def jdbc( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 12eb239363..80447fefe1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -22,17 +22,18 @@ import java.util.Properties import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.HadoopFsRelation /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, - * key-value stores, etc). Use [[DataFrame.write]] to access this. + * key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this. * * @since 1.4.0 */ @@ -184,6 +185,34 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** + * Starts the execution of the streaming query, which will continually output results to the given + * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * the stream. + * + * @since 2.0.0 + */ + def stream(path: String): ContinuousQuery = { + option("path", path).stream() + } + + /** + * Starts the execution of the streaming query, which will continually output results to the given + * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * the stream. + * + * @since 2.0.0 + */ + def stream(): ContinuousQuery = { + val sink = ResolvedDataSource.createSink( + df.sqlContext, + source, + extraOptions.toMap, + normalizedParCols.getOrElse(Nil)) + + new StreamExecution(df.sqlContext, df.logicalPlan, sink) + } + + /** * Inserts the content of the [[DataFrame]] to the specified table. It requires that * the schema of the [[DataFrame]] is the same as the schema of the table. * @@ -255,7 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { /** * The given column name may not be equal to any of the existing column names if we were in - * case-insensitive context. Normalize the given column name to the real one so that we don't + * case-insensitive context. Normalize the given column name to the real one so that we don't * need to care about case sensitivity afterwards. */ private def normalize(columnName: String, columnType: String): String = { @@ -339,7 +368,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @param connectionProperties JDBC database connection arguments, a list of arbitrary string * tag/value. Normally at least a "user" and "password" property * should be included. - * * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala deleted file mode 100644 index 2febc93fa4..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ - -import org.apache.spark.Logging -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.datasources.ResolvedDataSource -import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.types.StructType - -/** - * :: Experimental :: - * An interface to reading streaming data. Use `sqlContext.streamFrom` to access these methods. - * - * {{{ - * val df = sqlContext.streamFrom - * .format("...") - * .open() - * }}} - */ -@Experimental -class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging { - - /** - * Specifies the input data source format. - * - * @since 2.0.0 - */ - def format(source: String): DataStreamReader = { - this.source = source - this - } - - /** - * Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema - * automatically from data. By specifying the schema here, the underlying data stream can - * skip the schema inference step, and thus speed up data reading. - * - * @since 2.0.0 - */ - def schema(schema: StructType): DataStreamReader = { - this.userSpecifiedSchema = Option(schema) - this - } - - /** - * Adds an input option for the underlying data stream. - * - * @since 2.0.0 - */ - def option(key: String, value: String): DataStreamReader = { - this.extraOptions += (key -> value) - this - } - - /** - * (Scala-specific) Adds input options for the underlying data stream. - * - * @since 2.0.0 - */ - def options(options: scala.collection.Map[String, String]): DataStreamReader = { - this.extraOptions ++= options - this - } - - /** - * Adds input options for the underlying data stream. - * - * @since 2.0.0 - */ - def options(options: java.util.Map[String, String]): DataStreamReader = { - this.options(options.asScala) - this - } - - /** - * Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g. - * external key-value stores). - * - * @since 2.0.0 - */ - def open(): DataFrame = { - val resolved = ResolvedDataSource.createSource( - sqlContext, - userSpecifiedSchema = userSpecifiedSchema, - providerName = source, - options = extraOptions.toMap) - DataFrame(sqlContext, StreamingRelation(resolved)) - } - - /** - * Loads input in as a [[DataFrame]], for data streams that read from some path. - * - * @since 2.0.0 - */ - def open(path: String): DataFrame = { - option("path", path).open() - } - - /////////////////////////////////////////////////////////////////////////////////////// - // Builder pattern config options - /////////////////////////////////////////////////////////////////////////////////////// - - private var source: String = sqlContext.conf.defaultDataSourceName - - private var userSpecifiedSchema: Option[StructType] = None - - private var extraOptions = new scala.collection.mutable.HashMap[String, String] - -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala deleted file mode 100644 index b325d48fcb..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.datasources.ResolvedDataSource -import org.apache.spark.sql.execution.streaming.StreamExecution - -/** - * :: Experimental :: - * Interface used to start a streaming query query execution. - * - * @since 2.0.0 - */ -@Experimental -final class DataStreamWriter private[sql](df: DataFrame) { - - /** - * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. - * - * @since 2.0.0 - */ - def format(source: String): DataStreamWriter = { - this.source = source - this - } - - /** - * Adds an output option for the underlying data source. - * - * @since 2.0.0 - */ - def option(key: String, value: String): DataStreamWriter = { - this.extraOptions += (key -> value) - this - } - - /** - * (Scala-specific) Adds output options for the underlying data source. - * - * @since 2.0.0 - */ - def options(options: scala.collection.Map[String, String]): DataStreamWriter = { - this.extraOptions ++= options - this - } - - /** - * Adds output options for the underlying data source. - * - * @since 2.0.0 - */ - def options(options: java.util.Map[String, String]): DataStreamWriter = { - this.options(options.asScala) - this - } - - /** - * Partitions the output by the given columns on the file system. If specified, the output is - * laid out on the file system similar to Hive's partitioning scheme.\ - * @since 2.0.0 - */ - @scala.annotation.varargs - def partitionBy(colNames: String*): DataStreamWriter = { - this.partitioningColumns = colNames - this - } - - /** - * Starts the execution of the streaming query, which will continually output results to the given - * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with - * the stream. - * @since 2.0.0 - */ - def start(path: String): ContinuousQuery = { - this.extraOptions += ("path" -> path) - start() - } - - /** - * Starts the execution of the streaming query, which will continually output results to the given - * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with - * the stream. - * - * @since 2.0.0 - */ - def start(): ContinuousQuery = { - val sink = ResolvedDataSource.createSink( - df.sqlContext, - source, - extraOptions.toMap, - normalizedParCols) - - new StreamExecution(df.sqlContext, df.logicalPlan, sink) - } - - private def normalizedParCols: Seq[String] = { - partitioningColumns.map { col => - df.logicalPlan.output - .map(_.name) - .find(df.sqlContext.analyzer.resolver(_, col)) - .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " + - s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})")) - } - } - - /////////////////////////////////////////////////////////////////////////////////////// - // Builder pattern config options - /////////////////////////////////////////////////////////////////////////////////////// - - private var source: String = df.sqlContext.conf.defaultDataSourceName - - private var extraOptions = new scala.collection.mutable.HashMap[String, String] - - private var partitioningColumns: Seq[String] = Nil - -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 13700be068..1661fdbec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -579,10 +579,9 @@ class SQLContext private[sql]( DataFrame(self, LocalRelation(attrSeq, rows.toSeq)) } - /** * :: Experimental :: - * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. + * Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]]. * {{{ * sqlContext.read.parquet("/path/to/file.parquet") * sqlContext.read.schema(schema).json("/path/to/file.json") @@ -594,14 +593,6 @@ class SQLContext private[sql]( @Experimental def read: DataFrameReader = new DataFrameReader(this) - - /** - * :: Experimental :: - * Returns a [[DataStreamReader]] than can be used to access data continuously as it arrives. - */ - @Experimental - def streamFrom: DataStreamReader = new DataStreamReader(this) - /** * :: Experimental :: * Creates an external table from the given path and returns the corresponding DataFrame. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index e3065ac5f8..7702f535ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -122,7 +122,6 @@ object ResolvedDataSource extends Logging { provider.createSink(sqlContext, options, partitionColumns) } - /** Create a [[ResolvedDataSource]] for reading data in. */ def apply( sqlContext: SQLContext, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala index 1dab6ebf1b..b36b41cac9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala @@ -60,22 +60,22 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext { import testImplicits._ test("resolve default source") { - sqlContext.streamFrom + sqlContext.read .format("org.apache.spark.sql.streaming.test") - .open() - .streamTo + .stream() + .write .format("org.apache.spark.sql.streaming.test") - .start() + .stream() .stop() } test("resolve full class") { - sqlContext.streamFrom + sqlContext.read .format("org.apache.spark.sql.streaming.test.DefaultSource") - .open() - .streamTo + .stream() + .write .format("org.apache.spark.sql.streaming.test") - .start() + .stream() .stop() } @@ -83,12 +83,12 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext { val map = new java.util.HashMap[String, String] map.put("opt3", "3") - val df = sqlContext.streamFrom + val df = sqlContext.read .format("org.apache.spark.sql.streaming.test") .option("opt1", "1") .options(Map("opt2" -> "2")) .options(map) - .open() + .stream() assert(LastOptions.parameters("opt1") == "1") assert(LastOptions.parameters("opt2") == "2") @@ -96,12 +96,12 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext { LastOptions.parameters = null - df.streamTo + df.write .format("org.apache.spark.sql.streaming.test") .option("opt1", "1") .options(Map("opt2" -> "2")) .options(map) - .start() + .stream() .stop() assert(LastOptions.parameters("opt1") == "1") @@ -110,54 +110,53 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext { } test("partitioning") { - val df = sqlContext.streamFrom + val df = sqlContext.read .format("org.apache.spark.sql.streaming.test") - .open() + .stream() - df.streamTo + df.write .format("org.apache.spark.sql.streaming.test") - .start() + .stream() .stop() assert(LastOptions.partitionColumns == Nil) - df.streamTo + df.write .format("org.apache.spark.sql.streaming.test") .partitionBy("a") - .start() + .stream() .stop() assert(LastOptions.partitionColumns == Seq("a")) - withSQLConf("spark.sql.caseSensitive" -> "false") { - df.streamTo + df.write .format("org.apache.spark.sql.streaming.test") .partitionBy("A") - .start() + .stream() .stop() assert(LastOptions.partitionColumns == Seq("a")) } intercept[AnalysisException] { - df.streamTo + df.write .format("org.apache.spark.sql.streaming.test") .partitionBy("b") - .start() + .stream() .stop() } } test("stream paths") { - val df = sqlContext.streamFrom + val df = sqlContext.read .format("org.apache.spark.sql.streaming.test") - .open("/test") + .stream("/test") assert(LastOptions.parameters("path") == "/test") LastOptions.parameters = null - df.streamTo + df.write .format("org.apache.spark.sql.streaming.test") - .start("/test") + .stream("/test") .stop() assert(LastOptions.parameters("path") == "/test") |