aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2017-03-09 11:07:31 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-09 11:07:31 -0800
commit30b18e69361746b4d656474374d8b486bb48a19e (patch)
tree14448ffb7e2e507892d85bba321a5b01d39ef94c /sql
parent40da4d181d648308de85fdcabc5c098ee861949a (diff)
downloadspark-30b18e69361746b4d656474374d8b486bb48a19e.tar.gz
spark-30b18e69361746b4d656474374d8b486bb48a19e.tar.bz2
spark-30b18e69361746b4d656474374d8b486bb48a19e.zip
[SPARK-19861][SS] watermark should not be a negative time.
## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <hustyugm@gmail.com> Author: dylon <hustyugm@gmail.com> Closes #17202 from uncleGen/SPARK-19861.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala23
2 files changed, 26 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 16edb35b1d..0a4d3a93a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -563,7 +563,7 @@ class Dataset[T] private[sql](
* @param eventTime the name of the column that contains the event time of the row.
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
* record that has been processed in the form of an interval
- * (e.g. "1 minute" or "5 hours").
+ * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
*
* @group streaming
* @since 2.1.0
@@ -576,6 +576,8 @@ class Dataset[T] private[sql](
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+ require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
+ s"delay threshold ($delayThreshold) should not be negative.")
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c768525bc6..7614ea5eb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}
+ test("delay threshold should not be negative.") {
+ val inputData = MemoryStream[Int].toDF()
+ var e = intercept[IllegalArgumentException] {
+ inputData.withWatermark("value", "-1 year")
+ }
+ assert(e.getMessage contains "should not be negative.")
+
+ e = intercept[IllegalArgumentException] {
+ inputData.withWatermark("value", "1 year -13 months")
+ }
+ assert(e.getMessage contains "should not be negative.")
+
+ e = intercept[IllegalArgumentException] {
+ inputData.withWatermark("value", "1 month -40 days")
+ }
+ assert(e.getMessage contains "should not be negative.")
+
+ e = intercept[IllegalArgumentException] {
+ inputData.withWatermark("value", "-10 seconds")
+ }
+ assert(e.getMessage contains "should not be negative.")
+ }
+
test("the new watermark should override the old one") {
val df = MemoryStream[(Long, Long)].toDF()
.withColumn("first", $"_1".cast("timestamp"))