aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-01 23:37:03 -0700
committerReynold Xin <rxin@databricks.com>2016-11-01 23:37:03 -0700
commita36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1 (patch)
tree79c1791a25070585110d092d4c586effba719fa9 /sql/core/src/test
parentabefe2ec428dc24a4112c623fb6fbe4b2ca60a2b (diff)
downloadspark-a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1.tar.gz
spark-a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1.tar.bz2
spark-a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1.zip
[SPARK-18192] Support all file formats in structured streaming
## What changes were proposed in this pull request? This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API. ## How was this patch tested? Updated FileStreamSinkSuite to add test cases for json, text, and parquet. Author: Reynold Xin <rxin@databricks.com> Closes #15711 from rxin/SPARK-18192.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala62
1 files changed, 29 insertions, 33 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 902cf05344..0f140f94f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import org.apache.spark.sql._
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
@@ -142,42 +142,38 @@ class FileStreamSinkSuite extends StreamTest {
}
}
- test("FileStreamSink - supported formats") {
- def testFormat(format: Option[String]): Unit = {
- val inputData = MemoryStream[Int]
- val ds = inputData.toDS()
+ test("FileStreamSink - parquet") {
+ testFormat(None) // should not throw error as default format parquet when not specified
+ testFormat(Some("parquet"))
+ }
- val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
- val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+ test("FileStreamSink - text") {
+ testFormat(Some("text"))
+ }
- var query: StreamingQuery = null
+ test("FileStreamSink - json") {
+ testFormat(Some("text"))
+ }
- try {
- val writer =
- ds.map(i => (i, i * 1000))
- .toDF("id", "value")
- .writeStream
- if (format.nonEmpty) {
- writer.format(format.get)
- }
- query = writer
- .option("checkpointLocation", checkpointDir)
- .start(outputDir)
- } finally {
- if (query != null) {
- query.stop()
- }
- }
- }
+ def testFormat(format: Option[String]): Unit = {
+ val inputData = MemoryStream[Int]
+ val ds = inputData.toDS()
- testFormat(None) // should not throw error as default format parquet when not specified
- testFormat(Some("parquet"))
- val e = intercept[UnsupportedOperationException] {
- testFormat(Some("text"))
- }
- Seq("text", "not support", "stream").foreach { s =>
- assert(e.getMessage.contains(s))
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ var query: StreamingQuery = null
+
+ try {
+ val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
+ if (format.nonEmpty) {
+ writer.format(format.get)
+ }
+ query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
}
}
-
}