diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-21 21:27:08 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-21 21:27:08 -0700 |
commit | c1e87e384d1878308b42da80bb3d65be512aab55 (patch) | |
tree | ec8dd098b4e3daffa0cddfc786d9eb45f0ce05e6 /sql/catalyst/src/test/scala/org/apache/spark | |
parent | 2d73fcced0492c606feab8fe84f62e8318ebcaa1 (diff) | |
download | spark-c1e87e384d1878308b42da80bb3d65be512aab55.tar.gz spark-c1e87e384d1878308b42da80bb3d65be512aab55.tar.bz2 spark-c1e87e384d1878308b42da80bb3d65be512aab55.zip |
[SPARK-20030][SS] Event-time-based timeout for MapGroupsWithState
## What changes were proposed in this pull request?
Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp.
## How was this patch tested?
Unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #17361 from tdas/SPARK-20030.
Diffstat (limited to 'sql/catalyst/src/test/scala/org/apache/spark')
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 08216e2660..8f0a0c0d99 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -345,6 +345,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Append, expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates")) + // mapGroupsWithState with event time timeout + watermark + assertNotSupportedInStreamingPlan( + "mapGroupsWithState - mapGroupsWithState with event time timeout without watermark", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, + EventTimeTimeout, streamRelation), + outputMode = Update, + expectedMsgs = Seq("watermark")) + + assertSupportedInStreamingPlan( + "mapGroupsWithState - mapGroupsWithState with event time timeout with watermark", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, + EventTimeTimeout, new TestStreamingRelation(attributeWithWatermark)), + outputMode = Update) + // Deduplicate assertSupportedInStreamingPlan( "Deduplicate - Deduplicate on streaming relation before aggregation", |