aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/test/scala/org/apache/spark
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-21 21:27:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-21 21:27:08 -0700
commitc1e87e384d1878308b42da80bb3d65be512aab55 (patch)
treeec8dd098b4e3daffa0cddfc786d9eb45f0ce05e6 /sql/catalyst/src/test/scala/org/apache/spark
parent2d73fcced0492c606feab8fe84f62e8318ebcaa1 (diff)
downloadspark-c1e87e384d1878308b42da80bb3d65be512aab55.tar.gz
spark-c1e87e384d1878308b42da80bb3d65be512aab55.tar.bz2
spark-c1e87e384d1878308b42da80bb3d65be512aab55.zip
[SPARK-20030][SS] Event-time-based timeout for MapGroupsWithState
## What changes were proposed in this pull request? Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp. ## How was this patch tested? Unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17361 from tdas/SPARK-20030.
Diffstat (limited to 'sql/catalyst/src/test/scala/org/apache/spark')
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala16
1 files changed, 16 insertions, 0 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 08216e2660..8f0a0c0d99 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -345,6 +345,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates"))
+ // mapGroupsWithState with event time timeout + watermark
+ assertNotSupportedInStreamingPlan(
+ "mapGroupsWithState - mapGroupsWithState with event time timeout without watermark",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true,
+ EventTimeTimeout, streamRelation),
+ outputMode = Update,
+ expectedMsgs = Seq("watermark"))
+
+ assertSupportedInStreamingPlan(
+ "mapGroupsWithState - mapGroupsWithState with event time timeout with watermark",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true,
+ EventTimeTimeout, new TestStreamingRelation(attributeWithWatermark)),
+ outputMode = Update)
+
// Deduplicate
assertSupportedInStreamingPlan(
"Deduplicate - Deduplicate on streaming relation before aggregation",