aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c34d119734..c768525bc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.OutputMode._
@@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}
+ test("the new watermark should override the old one") {
+ val df = MemoryStream[(Long, Long)].toDF()
+ .withColumn("first", $"_1".cast("timestamp"))
+ .withColumn("second", $"_2".cast("timestamp"))
+ .withWatermark("first", "1 minute")
+ .withWatermark("second", "2 minutes")
+
+ val eventTimeColumns = df.logicalPlan.output
+ .filter(_.metadata.contains(EventTimeWatermark.delayKey))
+ assert(eventTimeColumns.size === 1)
+ assert(eventTimeColumns(0).name === "second")
+ }
+
private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)