diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-12-20 14:19:35 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-20 14:19:35 -0800 |
commit | caed89321fdabe83e46451ca4e968f86481ad500 (patch) | |
tree | 849c87ac5f9558671a868133ad1c99f66fb328fa /sql/core/src/main | |
parent | 95c95b71ed31b2971475aec6d7776dc234845d0a (diff) | |
download | spark-caed89321fdabe83e46451ca4e968f86481ad500.tar.gz spark-caed89321fdabe83e46451ca4e968f86481ad500.tar.bz2 spark-caed89321fdabe83e46451ca4e968f86481ad500.zip |
[SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf
## What changes were proposed in this pull request?
Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks.
## How was this patch tested?
Unit tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #16342 from brkyvz/chk-rec.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b3c600ae53..b7fc336223 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") - val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete() + val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), chkpointLoc, |