aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-03-07 20:34:55 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-07 20:34:55 -0800
commitd8830c5039d9c7c5ef03631904c32873ab558e22 (patch)
tree472cc22a9d66d4d9f22c5b84affaf05a0b8fdde3 /sql/catalyst
parentca849ac4e8fc520a4a12949b62b9730c5dfa097d (diff)
downloadspark-d8830c5039d9c7c5ef03631904c32873ab558e22.tar.gz
spark-d8830c5039d9c7c5ef03631904c32873ab558e22.tar.bz2
spark-d8830c5039d9c7c5ef03631904c32873ab558e22.zip
[SPARK-19859][SS] The new watermark should override the old one
## What changes were proposed in this pull request? The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected. ## How was this patch tested? The new test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17199 from zsxwing/SPARK-19859.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 77309ce391..62f68a6d7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -42,6 +42,13 @@ case class EventTimeWatermark(
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
.build()
a.withMetadata(updatedMetadata)
+ } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+ // Remove existing watermark
+ val updatedMetadata = new MetadataBuilder()
+ .withMetadata(a.metadata)
+ .remove(EventTimeWatermark.delayKey)
+ .build()
+ a.withMetadata(updatedMetadata)
} else {
a
}