diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-06-23 10:46:20 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-06-23 10:46:20 -0700 |
commit | d85bb10ce49926b8b661bd2cb97392205742fc14 (patch) | |
tree | 6977154bc227a2c9b742ef65d4b948eb7344acd1 /sql/core | |
parent | b5a997667f4c0e514217da6df5af37b8b849dfdf (diff) | |
download | spark-d85bb10ce49926b8b661bd2cb97392205742fc14.tar.gz spark-d85bb10ce49926b8b661bd2cb97392205742fc14.tar.bz2 spark-d85bb10ce49926b8b661bd2cb97392205742fc14.zip |
[SPARK-16116][SQL] ConsoleSink should not require checkpointLocation
## What changes were proposed in this pull request?
When the user uses `ConsoleSink`, we should use a temp location if `checkpointLocation` is not specified.
## How was this patch tested?
The added unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13817 from zsxwing/console-checkpoint.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 8 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala | 10 |
2 files changed, 18 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 197707404e..d4b0a3cca2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -272,6 +272,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { useTempCheckpointLocation = true, trigger = trigger) } else { + val (useTempCheckpointLocation, recoverFromCheckpointLocation) = + if (source == "console") { + (true, false) + } else { + (false, true) + } val dataSource = DataSource( df.sparkSession, @@ -284,6 +290,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { df, dataSource.createSink(outputMode), outputMode, + useTempCheckpointLocation = useTempCheckpointLocation, + recoverFromCheckpointLocation = recoverFromCheckpointLocation, trigger = trigger) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 943e7b761e..f099439581 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -457,4 +457,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } } + + test("ConsoleSink should not require checkpointLocation") { + LastOptions.clear() + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val sq = df.writeStream.format("console").start() + sq.stop() + } } |