diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-03-07 20:34:55 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-07 20:34:55 -0800 |
commit | d8830c5039d9c7c5ef03631904c32873ab558e22 (patch) | |
tree | 472cc22a9d66d4d9f22c5b84affaf05a0b8fdde3 | |
parent | ca849ac4e8fc520a4a12949b62b9730c5dfa097d (diff) | |
download | spark-d8830c5039d9c7c5ef03631904c32873ab558e22.tar.gz spark-d8830c5039d9c7c5ef03631904c32873ab558e22.tar.bz2 spark-d8830c5039d9c7c5ef03631904c32873ab558e22.zip |
[SPARK-19859][SS] The new watermark should override the old one
## What changes were proposed in this pull request?
The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected.
## How was this patch tested?
The new test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #17199 from zsxwing/SPARK-19859.
2 files changed, 21 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 77309ce391..62f68a6d7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -42,6 +42,13 @@ case class EventTimeWatermark( .putLong(EventTimeWatermark.delayKey, delay.milliseconds) .build() a.withMetadata(updatedMetadata) + } else if (a.metadata.contains(EventTimeWatermark.delayKey)) { + // Remove existing watermark + val updatedMetadata = new MetadataBuilder() + .withMetadata(a.metadata) + .remove(EventTimeWatermark.delayKey) + .build() + a.withMetadata(updatedMetadata) } else { a } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c34d119734..c768525bc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.OutputMode._ @@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + test("the new watermark should override the old one") { + val df = MemoryStream[(Long, Long)].toDF() + .withColumn("first", $"_1".cast("timestamp")) + .withColumn("second", $"_2".cast("timestamp")) + .withWatermark("first", "1 minute") + .withWatermark("second", "2 minutes") + + val eventTimeColumns = df.logicalPlan.output + .filter(_.metadata.contains(EventTimeWatermark.delayKey)) + assert(eventTimeColumns.size === 1) + assert(eventTimeColumns(0).name === "second") + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) |