diff options
author | uncleGen <hustyugm@gmail.com> | 2017-03-09 11:07:31 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-09 11:07:31 -0800 |
commit | 30b18e69361746b4d656474374d8b486bb48a19e (patch) | |
tree | 14448ffb7e2e507892d85bba321a5b01d39ef94c /sql/core/src/main/scala | |
parent | 40da4d181d648308de85fdcabc5c098ee861949a (diff) | |
download | spark-30b18e69361746b4d656474374d8b486bb48a19e.tar.gz spark-30b18e69361746b4d656474374d8b486bb48a19e.tar.bz2 spark-30b18e69361746b4d656474374d8b486bb48a19e.zip |
[SPARK-19861][SS] watermark should not be a negative time.
## What changes were proposed in this pull request?
`watermark` should not be negative. This behavior is invalid, check it before real run.
## How was this patch tested?
add new unit test.
Author: uncleGen <hustyugm@gmail.com>
Author: dylon <hustyugm@gmail.com>
Closes #17202 from uncleGen/SPARK-19861.
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 16edb35b1d..0a4d3a93a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -563,7 +563,7 @@ class Dataset[T] private[sql]( * @param eventTime the name of the column that contains the event time of the row. * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval - * (e.g. "1 minute" or "5 hours"). + * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative. * * @group streaming * @since 2.1.0 @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + s"delay threshold ($delayThreshold) should not be negative.") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } |