aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-12-20 14:19:35 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-20 14:19:35 -0800
commitcaed89321fdabe83e46451ca4e968f86481ad500 (patch)
tree849c87ac5f9558671a868133ad1c99f66fb328fa /sql/core/src/main
parent95c95b71ed31b2971475aec6d7776dc234845d0a (diff)
downloadspark-caed89321fdabe83e46451ca4e968f86481ad500.tar.gz
spark-caed89321fdabe83e46451ca4e968f86481ad500.tar.bz2
spark-caed89321fdabe83e46451ca4e968f86481ad500.zip
[SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf
## What changes were proposed in this pull request? Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks. ## How was this patch tested? Unit tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #16342 from brkyvz/chk-rec.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b3c600ae53..b7fc336223 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
- val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
+ val recoverFromChkpoint = outputMode == OutputMode.Complete()
val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
chkpointLoc,