aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2017-03-08 23:23:10 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-08 23:23:10 -0800
commiteeb1d6db878641d9eac62d0869a90fe80c1f4461 (patch)
tree5738191de87c5f212c7033a3dbac85153474aed3 /sql
parent029e40b412e332c9f0fff283d604e203066c78c0 (diff)
downloadspark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.tar.gz
spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.tar.bz2
spark-eeb1d6db878641d9eac62d0869a90fe80c1f4461.zip
[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17221 from uncleGen/SPARK-19859.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala19
2 files changed, 19 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 62f68a6d7b..06196b5afb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval
object EventTimeWatermark {
/** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
val delayKey = "spark.watermarkDelayMs"
+
+ def getDelayMs(delay: CalendarInterval): Long = {
+ // We define month as `31 days` to simplify calculation.
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ delay.milliseconds + delay.months * millisPerMonth
+ }
}
/**
@@ -37,9 +43,10 @@ case class EventTimeWatermark(
// Update the metadata on the eventTime column to include the desired delay.
override val output: Seq[Attribute] = child.output.map { a =>
if (a semanticEquals eventTime) {
+ val delayMs = EventTimeWatermark.getDelayMs(delay)
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+ .putLong(EventTimeWatermark.delayKey, delayMs)
.build()
a.withMetadata(updatedMetadata)
} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 5a9a99e111..25cf609fc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,10 +84,7 @@ case class EventTimeWatermarkExec(
child: SparkPlan) extends SparkPlan {
val eventTimeStats = new EventTimeStatsAccum()
- val delayMs = {
- val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
- delay.milliseconds + delay.months * millisPerMonth
- }
+ val delayMs = EventTimeWatermark.getDelayMs(delay)
sparkContext.register(eventTimeStats)
@@ -105,10 +102,16 @@ case class EventTimeWatermarkExec(
override val output: Seq[Attribute] = child.output.map { a =>
if (a semanticEquals eventTime) {
val updatedMetadata = new MetadataBuilder()
- .withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delayMs)
- .build()
-
+ .withMetadata(a.metadata)
+ .putLong(EventTimeWatermark.delayKey, delayMs)
+ .build()
+ a.withMetadata(updatedMetadata)
+ } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+ // Remove existing watermark
+ val updatedMetadata = new MetadataBuilder()
+ .withMetadata(a.metadata)
+ .remove(EventTimeWatermark.delayKey)
+ .build()
a.withMetadata(updatedMetadata)
} else {
a