aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 16edb35b1d..0a4d3a93a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -563,7 +563,7 @@ class Dataset[T] private[sql](
* @param eventTime the name of the column that contains the event time of the row.
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
* record that has been processed in the form of an interval
- * (e.g. "1 minute" or "5 hours").
+ * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
*
* @group streaming
* @since 2.1.0
@@ -576,6 +576,8 @@ class Dataset[T] private[sql](
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+ require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
+ s"delay threshold ($delayThreshold) should not be negative.")
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}