diff options
author | uncleGen <hustyugm@gmail.com> | 2017-03-08 23:23:10 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-08 23:23:10 -0800 |
commit | eeb1d6db878641d9eac62d0869a90fe80c1f4461 (patch) | |
tree | 5738191de87c5f212c7033a3dbac85153474aed3 /sql/core/src | |
parent | 029e40b412e332c9f0fff283d604e203066c78c0 (diff) | |
download | spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.tar.gz spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.tar.bz2 spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.zip |
[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
## What changes were proposed in this pull request?
A follow up to SPARK-19859:
- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark
## How was this patch tested?
Jenkins.
Author: uncleGen <hustyugm@gmail.com>
Closes #17221 from uncleGen/SPARK-19859.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 5a9a99e111..25cf609fc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,10 +84,7 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() - val delayMs = { - val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 - delay.milliseconds + delay.months * millisPerMonth - } + val delayMs = EventTimeWatermark.getDelayMs(delay) sparkContext.register(eventTimeStats) @@ -105,10 +102,16 @@ case class EventTimeWatermarkExec( override val output: Seq[Attribute] = child.output.map { a => if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() - .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delayMs) - .build() - + .withMetadata(a.metadata) + .putLong(EventTimeWatermark.delayKey, delayMs) + .build() + a.withMetadata(updatedMetadata) + } else if (a.metadata.contains(EventTimeWatermark.delayKey)) { + // Remove existing watermark + val updatedMetadata = new MetadataBuilder() + .withMetadata(a.metadata) + .remove(EventTimeWatermark.delayKey) + .build() a.withMetadata(updatedMetadata) } else { a |