aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-01 23:37:03 -0700
committerReynold Xin <rxin@databricks.com>2016-11-01 23:37:03 -0700
commita36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1 (patch)
tree79c1791a25070585110d092d4c586effba719fa9 /sql/core
parentabefe2ec428dc24a4112c623fb6fbe4b2ca60a2b (diff)
downloadspark-a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1.tar.gz
spark-a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1.tar.bz2
spark-a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1.zip
[SPARK-18192] Support all file formats in structured streaming
## What changes were proposed in this pull request? This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API. ## How was this patch tested? Updated FileStreamSinkSuite to add test cases for json, text, and parquet. Author: Reynold Xin <rxin@databricks.com> Closes #15711 from rxin/SPARK-18192.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala62
2 files changed, 32 insertions, 38 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d980e6a15a..3f956c4276 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -37,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
@@ -292,7 +290,7 @@ case class DataSource(
case s: StreamSinkProvider =>
s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode)
- case parquet: parquet.ParquetFileFormat =>
+ case fileFormat: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
@@ -301,7 +299,7 @@ case class DataSource(
throw new IllegalArgumentException(
s"Data source $className does not support $outputMode output mode")
}
- new FileStreamSink(sparkSession, path, parquet, partitionColumns, options)
+ new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options)
case _ =>
throw new UnsupportedOperationException(
@@ -516,7 +514,7 @@ case class DataSource(
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
- s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
+ s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 902cf05344..0f140f94f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import org.apache.spark.sql._
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
@@ -142,42 +142,38 @@ class FileStreamSinkSuite extends StreamTest {
}
}
- test("FileStreamSink - supported formats") {
- def testFormat(format: Option[String]): Unit = {
- val inputData = MemoryStream[Int]
- val ds = inputData.toDS()
+ test("FileStreamSink - parquet") {
+ testFormat(None) // should not throw error as default format parquet when not specified
+ testFormat(Some("parquet"))
+ }
- val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
- val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+ test("FileStreamSink - text") {
+ testFormat(Some("text"))
+ }
- var query: StreamingQuery = null
+ test("FileStreamSink - json") {
+ testFormat(Some("text"))
+ }
- try {
- val writer =
- ds.map(i => (i, i * 1000))
- .toDF("id", "value")
- .writeStream
- if (format.nonEmpty) {
- writer.format(format.get)
- }
- query = writer
- .option("checkpointLocation", checkpointDir)
- .start(outputDir)
- } finally {
- if (query != null) {
- query.stop()
- }
- }
- }
+ def testFormat(format: Option[String]): Unit = {
+ val inputData = MemoryStream[Int]
+ val ds = inputData.toDS()
- testFormat(None) // should not throw error as default format parquet when not specified
- testFormat(Some("parquet"))
- val e = intercept[UnsupportedOperationException] {
- testFormat(Some("text"))
- }
- Seq("text", "not support", "stream").foreach { s =>
- assert(e.getMessage.contains(s))
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ var query: StreamingQuery = null
+
+ try {
+ val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
+ if (format.nonEmpty) {
+ writer.format(format.get)
+ }
+ query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
}
}
-
}