aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-02-03 16:10:11 -0800
committerMichael Armbrust <michael@databricks.com>2016-02-03 16:10:11 -0800
commit915a75398ecbccdbf9a1e07333104c857ae1ce5e (patch)
tree94310aa5797bbeca8d142373c199b6dc68fb049f
parent3221eddb8f9728f65c579969a3a88baeeb7577a9 (diff)
downloadspark-915a75398ecbccdbf9a1e07333104c857ae1ce5e.tar.gz
spark-915a75398ecbccdbf9a1e07333104c857ae1ce5e.tar.bz2
spark-915a75398ecbccdbf9a1e07333104c857ae1ce5e.zip
[SPARK-13166][SQL] Remove DataStreamReader/Writer
They seem redundant and we can simply use DataFrameReader/Writer. The new usage looks like: ```scala val df = sqlContext.read.stream("...") val handle = df.write.stream("...") handle.stop() ``` Author: Reynold Xin <rxin@databricks.com> Closes #11062 from rxin/SPARK-13166.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala127
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala134
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala53
8 files changed, 86 insertions, 315 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6de17e5924..84203bbfef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1682,7 +1682,7 @@ class DataFrame private[sql](
/**
* :: Experimental ::
- * Interface for saving the content of the [[DataFrame]] out into external storage.
+ * Interface for saving the content of the [[DataFrame]] out into external storage or streams.
*
* @group output
* @since 1.4.0
@@ -1691,14 +1691,6 @@ class DataFrame private[sql](
def write: DataFrameWriter = new DataFrameWriter(this)
/**
- * :: Experimental ::
- * Interface for starting a streaming query that will continually output results to the specified
- * external sink as new data arrives.
- */
- @Experimental
- def streamTo: DataStreamWriter = new DataStreamWriter(this)
-
- /**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
* @since 1.3.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 2e0c6c7df9..a58643a5ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,17 +29,17 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{CatalystQl}
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.JSONRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
+import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
* Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
- * key-value stores, etc). Use [[SQLContext.read]] to access this.
+ * key-value stores, etc) or data streams. Use [[SQLContext.read]] to access this.
*
* @since 1.4.0
*/
@@ -137,6 +137,30 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}
/**
+ * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
+ * (e.g. external key-value stores).
+ *
+ * @since 2.0.0
+ */
+ def stream(): DataFrame = {
+ val resolved = ResolvedDataSource.createSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ providerName = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, StreamingRelation(resolved))
+ }
+
+ /**
+ * Loads input in as a [[DataFrame]], for data streams that read from some path.
+ *
+ * @since 2.0.0
+ */
+ def stream(path: String): DataFrame = {
+ option("path", path).stream()
+ }
+
+ /**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
*
@@ -165,7 +189,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
- *
* @since 1.4.0
*/
def jdbc(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 12eb239363..80447fefe1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,17 +22,18 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.sources.HadoopFsRelation
/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
- * key-value stores, etc). Use [[DataFrame.write]] to access this.
+ * key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this.
*
* @since 1.4.0
*/
@@ -184,6 +185,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
+ * Starts the execution of the streaming query, which will continually output results to the given
+ * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * the stream.
+ *
+ * @since 2.0.0
+ */
+ def stream(path: String): ContinuousQuery = {
+ option("path", path).stream()
+ }
+
+ /**
+ * Starts the execution of the streaming query, which will continually output results to the given
+ * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * the stream.
+ *
+ * @since 2.0.0
+ */
+ def stream(): ContinuousQuery = {
+ val sink = ResolvedDataSource.createSink(
+ df.sqlContext,
+ source,
+ extraOptions.toMap,
+ normalizedParCols.getOrElse(Nil))
+
+ new StreamExecution(df.sqlContext, df.logicalPlan, sink)
+ }
+
+ /**
* Inserts the content of the [[DataFrame]] to the specified table. It requires that
* the schema of the [[DataFrame]] is the same as the schema of the table.
*
@@ -255,7 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
/**
* The given column name may not be equal to any of the existing column names if we were in
- * case-insensitive context. Normalize the given column name to the real one so that we don't
+ * case-insensitive context. Normalize the given column name to the real one so that we don't
* need to care about case sensitivity afterwards.
*/
private def normalize(columnName: String, columnType: String): String = {
@@ -339,7 +368,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
- *
* @since 1.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
deleted file mode 100644
index 2febc93fa4..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.Logging
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
-import org.apache.spark.sql.execution.streaming.StreamingRelation
-import org.apache.spark.sql.types.StructType
-
-/**
- * :: Experimental ::
- * An interface to reading streaming data. Use `sqlContext.streamFrom` to access these methods.
- *
- * {{{
- * val df = sqlContext.streamFrom
- * .format("...")
- * .open()
- * }}}
- */
-@Experimental
-class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {
-
- /**
- * Specifies the input data source format.
- *
- * @since 2.0.0
- */
- def format(source: String): DataStreamReader = {
- this.source = source
- this
- }
-
- /**
- * Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema
- * automatically from data. By specifying the schema here, the underlying data stream can
- * skip the schema inference step, and thus speed up data reading.
- *
- * @since 2.0.0
- */
- def schema(schema: StructType): DataStreamReader = {
- this.userSpecifiedSchema = Option(schema)
- this
- }
-
- /**
- * Adds an input option for the underlying data stream.
- *
- * @since 2.0.0
- */
- def option(key: String, value: String): DataStreamReader = {
- this.extraOptions += (key -> value)
- this
- }
-
- /**
- * (Scala-specific) Adds input options for the underlying data stream.
- *
- * @since 2.0.0
- */
- def options(options: scala.collection.Map[String, String]): DataStreamReader = {
- this.extraOptions ++= options
- this
- }
-
- /**
- * Adds input options for the underlying data stream.
- *
- * @since 2.0.0
- */
- def options(options: java.util.Map[String, String]): DataStreamReader = {
- this.options(options.asScala)
- this
- }
-
- /**
- * Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g.
- * external key-value stores).
- *
- * @since 2.0.0
- */
- def open(): DataFrame = {
- val resolved = ResolvedDataSource.createSource(
- sqlContext,
- userSpecifiedSchema = userSpecifiedSchema,
- providerName = source,
- options = extraOptions.toMap)
- DataFrame(sqlContext, StreamingRelation(resolved))
- }
-
- /**
- * Loads input in as a [[DataFrame]], for data streams that read from some path.
- *
- * @since 2.0.0
- */
- def open(path: String): DataFrame = {
- option("path", path).open()
- }
-
- ///////////////////////////////////////////////////////////////////////////////////////
- // Builder pattern config options
- ///////////////////////////////////////////////////////////////////////////////////////
-
- private var source: String = sqlContext.conf.defaultDataSourceName
-
- private var userSpecifiedSchema: Option[StructType] = None
-
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
deleted file mode 100644
index b325d48fcb..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
-import org.apache.spark.sql.execution.streaming.StreamExecution
-
-/**
- * :: Experimental ::
- * Interface used to start a streaming query query execution.
- *
- * @since 2.0.0
- */
-@Experimental
-final class DataStreamWriter private[sql](df: DataFrame) {
-
- /**
- * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
- *
- * @since 2.0.0
- */
- def format(source: String): DataStreamWriter = {
- this.source = source
- this
- }
-
- /**
- * Adds an output option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: String): DataStreamWriter = {
- this.extraOptions += (key -> value)
- this
- }
-
- /**
- * (Scala-specific) Adds output options for the underlying data source.
- *
- * @since 2.0.0
- */
- def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
- this.extraOptions ++= options
- this
- }
-
- /**
- * Adds output options for the underlying data source.
- *
- * @since 2.0.0
- */
- def options(options: java.util.Map[String, String]): DataStreamWriter = {
- this.options(options.asScala)
- this
- }
-
- /**
- * Partitions the output by the given columns on the file system. If specified, the output is
- * laid out on the file system similar to Hive's partitioning scheme.\
- * @since 2.0.0
- */
- @scala.annotation.varargs
- def partitionBy(colNames: String*): DataStreamWriter = {
- this.partitioningColumns = colNames
- this
- }
-
- /**
- * Starts the execution of the streaming query, which will continually output results to the given
- * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
- * the stream.
- * @since 2.0.0
- */
- def start(path: String): ContinuousQuery = {
- this.extraOptions += ("path" -> path)
- start()
- }
-
- /**
- * Starts the execution of the streaming query, which will continually output results to the given
- * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
- * the stream.
- *
- * @since 2.0.0
- */
- def start(): ContinuousQuery = {
- val sink = ResolvedDataSource.createSink(
- df.sqlContext,
- source,
- extraOptions.toMap,
- normalizedParCols)
-
- new StreamExecution(df.sqlContext, df.logicalPlan, sink)
- }
-
- private def normalizedParCols: Seq[String] = {
- partitioningColumns.map { col =>
- df.logicalPlan.output
- .map(_.name)
- .find(df.sqlContext.analyzer.resolver(_, col))
- .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
- s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
- }
- }
-
- ///////////////////////////////////////////////////////////////////////////////////////
- // Builder pattern config options
- ///////////////////////////////////////////////////////////////////////////////////////
-
- private var source: String = df.sqlContext.conf.defaultDataSourceName
-
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
-
- private var partitioningColumns: Seq[String] = Nil
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 13700be068..1661fdbec5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -579,10 +579,9 @@ class SQLContext private[sql](
DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
}
-
/**
* :: Experimental ::
- * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
+ * Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]].
* {{{
* sqlContext.read.parquet("/path/to/file.parquet")
* sqlContext.read.schema(schema).json("/path/to/file.json")
@@ -594,14 +593,6 @@ class SQLContext private[sql](
@Experimental
def read: DataFrameReader = new DataFrameReader(this)
-
- /**
- * :: Experimental ::
- * Returns a [[DataStreamReader]] than can be used to access data continuously as it arrives.
- */
- @Experimental
- def streamFrom: DataStreamReader = new DataStreamReader(this)
-
/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index e3065ac5f8..7702f535ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -122,7 +122,6 @@ object ResolvedDataSource extends Logging {
provider.createSink(sqlContext, options, partitionColumns)
}
-
/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
sqlContext: SQLContext,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
index 1dab6ebf1b..b36b41cac9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
@@ -60,22 +60,22 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("resolve default source") {
- sqlContext.streamFrom
+ sqlContext.read
.format("org.apache.spark.sql.streaming.test")
- .open()
- .streamTo
+ .stream()
+ .write
.format("org.apache.spark.sql.streaming.test")
- .start()
+ .stream()
.stop()
}
test("resolve full class") {
- sqlContext.streamFrom
+ sqlContext.read
.format("org.apache.spark.sql.streaming.test.DefaultSource")
- .open()
- .streamTo
+ .stream()
+ .write
.format("org.apache.spark.sql.streaming.test")
- .start()
+ .stream()
.stop()
}
@@ -83,12 +83,12 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
val map = new java.util.HashMap[String, String]
map.put("opt3", "3")
- val df = sqlContext.streamFrom
+ val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "1")
.options(Map("opt2" -> "2"))
.options(map)
- .open()
+ .stream()
assert(LastOptions.parameters("opt1") == "1")
assert(LastOptions.parameters("opt2") == "2")
@@ -96,12 +96,12 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
LastOptions.parameters = null
- df.streamTo
+ df.write
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "1")
.options(Map("opt2" -> "2"))
.options(map)
- .start()
+ .stream()
.stop()
assert(LastOptions.parameters("opt1") == "1")
@@ -110,54 +110,53 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
}
test("partitioning") {
- val df = sqlContext.streamFrom
+ val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
- .open()
+ .stream()
- df.streamTo
+ df.write
.format("org.apache.spark.sql.streaming.test")
- .start()
+ .stream()
.stop()
assert(LastOptions.partitionColumns == Nil)
- df.streamTo
+ df.write
.format("org.apache.spark.sql.streaming.test")
.partitionBy("a")
- .start()
+ .stream()
.stop()
assert(LastOptions.partitionColumns == Seq("a"))
-
withSQLConf("spark.sql.caseSensitive" -> "false") {
- df.streamTo
+ df.write
.format("org.apache.spark.sql.streaming.test")
.partitionBy("A")
- .start()
+ .stream()
.stop()
assert(LastOptions.partitionColumns == Seq("a"))
}
intercept[AnalysisException] {
- df.streamTo
+ df.write
.format("org.apache.spark.sql.streaming.test")
.partitionBy("b")
- .start()
+ .stream()
.stop()
}
}
test("stream paths") {
- val df = sqlContext.streamFrom
+ val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
- .open("/test")
+ .stream("/test")
assert(LastOptions.parameters("path") == "/test")
LastOptions.parameters = null
- df.streamTo
+ df.write
.format("org.apache.spark.sql.streaming.test")
- .start("/test")
+ .stream("/test")
.stop()
assert(LastOptions.parameters("path") == "/test")