From 30b18e69361746b4d656474374d8b486bb48a19e Mon Sep 17 00:00:00 2001 From: uncleGen Date: Thu, 9 Mar 2017 11:07:31 -0800 Subject: [SPARK-19861][SS] watermark should not be a negative time. ## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen Author: dylon Closes #17202 from uncleGen/SPARK-19861. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'sql/core/src/main/scala/org/apache') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 16edb35b1d..0a4d3a93a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -563,7 +563,7 @@ class Dataset[T] private[sql]( * @param eventTime the name of the column that contains the event time of the row. * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval - * (e.g. "1 minute" or "5 hours"). + * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative. * * @group streaming * @since 2.1.0 @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + s"delay threshold ($delayThreshold) should not be negative.") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } -- cgit v1.2.3