aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-23 10:46:20 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-23 10:46:20 -0700
commitd85bb10ce49926b8b661bd2cb97392205742fc14 (patch)
tree6977154bc227a2c9b742ef65d4b948eb7344acd1 /sql
parentb5a997667f4c0e514217da6df5af37b8b849dfdf (diff)
downloadspark-d85bb10ce49926b8b661bd2cb97392205742fc14.tar.gz
spark-d85bb10ce49926b8b661bd2cb97392205742fc14.tar.bz2
spark-d85bb10ce49926b8b661bd2cb97392205742fc14.zip
[SPARK-16116][SQL] ConsoleSink should not require checkpointLocation
## What changes were proposed in this pull request? When the user uses `ConsoleSink`, we should use a temp location if `checkpointLocation` is not specified. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13817 from zsxwing/console-checkpoint.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala10
2 files changed, 18 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 197707404e..d4b0a3cca2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -272,6 +272,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
useTempCheckpointLocation = true,
trigger = trigger)
} else {
+ val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
+ if (source == "console") {
+ (true, false)
+ } else {
+ (false, true)
+ }
val dataSource =
DataSource(
df.sparkSession,
@@ -284,6 +290,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
df,
dataSource.createSink(outputMode),
outputMode,
+ useTempCheckpointLocation = useTempCheckpointLocation,
+ recoverFromCheckpointLocation = recoverFromCheckpointLocation,
trigger = trigger)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 943e7b761e..f099439581 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -457,4 +457,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
}
}
+
+ test("ConsoleSink should not require checkpointLocation") {
+ LastOptions.clear()
+ val df = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+
+ val sq = df.writeStream.format("console").start()
+ sq.stop()
+ }
}