aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2017-03-08 23:23:10 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-08 23:23:10 -0800
commiteeb1d6db878641d9eac62d0869a90fe80c1f4461 (patch)
tree5738191de87c5f212c7033a3dbac85153474aed3 /sql/catalyst
parent029e40b412e332c9f0fff283d604e203066c78c0 (diff)
downloadspark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.tar.gz
spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.tar.bz2
spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.zip
[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17221 from uncleGen/SPARK-19859.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 62f68a6d7b..06196b5afb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval
object EventTimeWatermark {
/** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
val delayKey = "spark.watermarkDelayMs"
+
+ def getDelayMs(delay: CalendarInterval): Long = {
+ // We define month as `31 days` to simplify calculation.
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ delay.milliseconds + delay.months * millisPerMonth
+ }
}
/**
@@ -37,9 +43,10 @@ case class EventTimeWatermark(
// Update the metadata on the eventTime column to include the desired delay.
override val output: Seq[Attribute] = child.output.map { a =>
if (a semanticEquals eventTime) {
+ val delayMs = EventTimeWatermark.getDelayMs(delay)
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+ .putLong(EventTimeWatermark.delayKey, delayMs)
.build()
a.withMetadata(updatedMetadata)
} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {