aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-22 12:30:36 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-22 12:30:36 -0700
commit82b598b963a21ae9d6a2a9638e86b4165c2a78c9 (patch)
tree86a2f15a15d9642df2ebbc0037a4e91025a04164 /sql/core/src/test/scala/org
parent80fd070389a9c8ffa342d7b11f1ab2ea92e0f562 (diff)
downloadspark-82b598b963a21ae9d6a2a9638e86b4165c2a78c9.tar.gz
spark-82b598b963a21ae9d6a2a9638e86b4165c2a78c9.tar.bz2
spark-82b598b963a21ae9d6a2a9638e86b4165c2a78c9.zip
[SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithState
## What changes were proposed in this pull request? Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs. ## How was this patch tested? Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17385 from tdas/SPARK-20057.
Diffstat (limited to 'sql/core/src/test/scala/org')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala122
1 files changed, 61 insertions, 61 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index fe72283bb6..3dabef6a9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.RDDScanExec
-import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, KeyedStateImpl, MemoryStream}
+import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StoreUpdate}
import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore
import org.apache.spark.sql.types.{DataType, IntegerType}
@@ -43,16 +43,16 @@ case class Result(key: Long, count: Int)
class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
import testImplicits._
- import KeyedStateImpl._
- import KeyedStateTimeout._
+ import GroupStateImpl._
+ import GroupStateTimeout._
override def afterAll(): Unit = {
super.afterAll()
StateStore.stop()
}
- test("KeyedState - get, exists, update, remove") {
- var state: KeyedStateImpl[String] = null
+ test("GroupState - get, exists, update, remove") {
+ var state: GroupStateImpl[String] = null
def testState(
expectedData: Option[String],
@@ -73,13 +73,13 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
// Updating empty state
- state = new KeyedStateImpl[String](None)
+ state = new GroupStateImpl[String](None)
testState(None)
state.update("")
testState(Some(""), shouldBeUpdated = true)
// Updating exiting state
- state = new KeyedStateImpl[String](Some("2"))
+ state = new GroupStateImpl[String](Some("2"))
testState(Some("2"))
state.update("3")
testState(Some("3"), shouldBeUpdated = true)
@@ -97,19 +97,19 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
}
- test("KeyedState - setTimeout**** with NoTimeout") {
+ test("GroupState - setTimeout**** with NoTimeout") {
for (initState <- Seq(None, Some(5))) {
// for different initial state
- implicit val state = new KeyedStateImpl(initState, 1000, 1000, NoTimeout, hasTimedOut = false)
+ implicit val state = new GroupStateImpl(initState, 1000, 1000, NoTimeout, hasTimedOut = false)
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
}
}
- test("KeyedState - setTimeout**** with ProcessingTimeTimeout") {
- implicit var state: KeyedStateImpl[Int] = null
+ test("GroupState - setTimeout**** with ProcessingTimeTimeout") {
+ implicit var state: GroupStateImpl[Int] = null
- state = new KeyedStateImpl[Int](None, 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false)
+ state = new GroupStateImpl[Int](None, 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false)
assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
testTimeoutDurationNotAllowed[IllegalStateException](state)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
@@ -128,8 +128,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
}
- test("KeyedState - setTimeout**** with EventTimeTimeout") {
- implicit val state = new KeyedStateImpl[Int](
+ test("GroupState - setTimeout**** with EventTimeTimeout") {
+ implicit val state = new GroupStateImpl[Int](
None, 1000, 1000, EventTimeTimeout, hasTimedOut = false)
assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
@@ -148,8 +148,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testTimeoutTimestampNotAllowed[IllegalStateException](state)
}
- test("KeyedState - illegal params to setTimeout****") {
- var state: KeyedStateImpl[Int] = null
+ test("GroupState - illegal params to setTimeout****") {
+ var state: GroupStateImpl[Int] = null
// Test setTimeout****() with illegal values
def testIllegalTimeout(body: => Unit): Unit = {
@@ -157,14 +157,14 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
}
- state = new KeyedStateImpl(Some(5), 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false)
+ state = new GroupStateImpl(Some(5), 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false)
testIllegalTimeout { state.setTimeoutDuration(-1000) }
testIllegalTimeout { state.setTimeoutDuration(0) }
testIllegalTimeout { state.setTimeoutDuration("-2 second") }
testIllegalTimeout { state.setTimeoutDuration("-1 month") }
testIllegalTimeout { state.setTimeoutDuration("1 month -1 day") }
- state = new KeyedStateImpl(Some(5), 1000, 1000, EventTimeTimeout, hasTimedOut = false)
+ state = new GroupStateImpl(Some(5), 1000, 1000, EventTimeTimeout, hasTimedOut = false)
testIllegalTimeout { state.setTimeoutTimestamp(-10000) }
testIllegalTimeout { state.setTimeoutTimestamp(10000, "-3 second") }
testIllegalTimeout { state.setTimeoutTimestamp(10000, "-1 month") }
@@ -175,25 +175,25 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000), "1 month -1 day") }
}
- test("KeyedState - hasTimedOut") {
+ test("GroupState - hasTimedOut") {
for (timeoutConf <- Seq(NoTimeout, ProcessingTimeTimeout, EventTimeTimeout)) {
for (initState <- Seq(None, Some(5))) {
- val state1 = new KeyedStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = false)
+ val state1 = new GroupStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = false)
assert(state1.hasTimedOut === false)
- val state2 = new KeyedStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = true)
+ val state2 = new GroupStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = true)
assert(state2.hasTimedOut === true)
}
}
}
- test("KeyedState - primitive type") {
- var intState = new KeyedStateImpl[Int](None)
+ test("GroupState - primitive type") {
+ var intState = new GroupStateImpl[Int](None)
intercept[NoSuchElementException] {
intState.get
}
assert(intState.getOption === None)
- intState = new KeyedStateImpl[Int](Some(10))
+ intState = new GroupStateImpl[Int](Some(10))
assert(intState.get == 10)
intState.update(0)
assert(intState.get == 0)
@@ -218,21 +218,21 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testStateUpdateWithData(
testName + "no update",
stateUpdates = state => { /* do nothing */ },
- timeoutConf = KeyedStateTimeout.NoTimeout,
+ timeoutConf = GroupStateTimeout.NoTimeout,
priorState = priorState,
expectedState = priorState) // should not change
testStateUpdateWithData(
testName + "state updated",
stateUpdates = state => { state.update(5) },
- timeoutConf = KeyedStateTimeout.NoTimeout,
+ timeoutConf = GroupStateTimeout.NoTimeout,
priorState = priorState,
expectedState = Some(5)) // should change
testStateUpdateWithData(
testName + "state removed",
stateUpdates = state => { state.remove() },
- timeoutConf = KeyedStateTimeout.NoTimeout,
+ timeoutConf = GroupStateTimeout.NoTimeout,
priorState = priorState,
expectedState = None) // should be removed
}
@@ -283,7 +283,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testStateUpdateWithData(
s"ProcessingTimeTimeout - $testName - state and timeout duration updated",
stateUpdates =
- (state: KeyedState[Int]) => { state.update(5); state.setTimeoutDuration(5000) },
+ (state: GroupState[Int]) => { state.update(5); state.setTimeoutDuration(5000) },
timeoutConf = ProcessingTimeTimeout,
priorState = priorState,
priorTimeoutTimestamp = priorTimeoutTimestamp,
@@ -293,7 +293,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testStateUpdateWithData(
s"EventTimeTimeout - $testName - state and timeout timestamp updated",
stateUpdates =
- (state: KeyedState[Int]) => { state.update(5); state.setTimeoutTimestamp(5000) },
+ (state: GroupState[Int]) => { state.update(5); state.setTimeoutTimestamp(5000) },
timeoutConf = EventTimeTimeout,
priorState = priorState,
priorTimeoutTimestamp = priorTimeoutTimestamp,
@@ -303,7 +303,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
testStateUpdateWithData(
s"EventTimeTimeout - $testName - timeout timestamp updated to before watermark",
stateUpdates =
- (state: KeyedState[Int]) => {
+ (state: GroupState[Int]) => {
state.update(5)
intercept[IllegalArgumentException] {
state.setTimeoutTimestamp(currentBatchWatermark - 1) // try to set to < watermark
@@ -387,7 +387,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
test("StateStoreUpdater - rows are cloned before writing to StateStore") {
// function for running count
- val func = (key: Int, values: Iterator[Int], state: KeyedState[Int]) => {
+ val func = (key: Int, values: Iterator[Int], state: GroupState[Int]) => {
state.update(state.getOption.getOrElse(0) + values.size)
Iterator.empty
}
@@ -404,7 +404,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
test("flatMapGroupsWithState - streaming") {
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count if state is defined, otherwise does not return anything
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
if (count == 3) {
@@ -420,7 +420,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
val result =
inputData.toDS()
.groupByKey(x => x)
- .flatMapGroupsWithState(Update, KeyedStateTimeout.NoTimeout)(stateFunc)
+ .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc)
testStream(result, Update)(
AddData(inputData, "a"),
@@ -446,7 +446,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count if state is defined, otherwise does not return anything
// Additionally, it updates state lazily as the returned iterator get consumed
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
values.flatMap { _ =>
val count = state.getOption.map(_.count).getOrElse(0L) + 1
if (count == 3) {
@@ -463,7 +463,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
val result =
inputData.toDS()
.groupByKey(x => x)
- .flatMapGroupsWithState(Update, KeyedStateTimeout.NoTimeout)(stateFunc)
+ .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc)
testStream(result, Update)(
AddData(inputData, "a", "a", "b"),
CheckLastBatch(("a", "1"), ("a", "2"), ("b", "1")),
@@ -481,7 +481,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
test("flatMapGroupsWithState - streaming + aggregation") {
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
if (count == 3) {
@@ -497,7 +497,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
val result =
inputData.toDS()
.groupByKey(x => x)
- .flatMapGroupsWithState(Append, KeyedStateTimeout.NoTimeout)(stateFunc)
+ .flatMapGroupsWithState(Append, GroupStateTimeout.NoTimeout)(stateFunc)
.groupByKey(_._1)
.count()
@@ -524,20 +524,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
test("flatMapGroupsWithState - batch") {
// Function that returns running count only if its even, otherwise does not return
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
if (state.exists) throw new IllegalArgumentException("state.exists should be false")
Iterator((key, values.size))
}
val df = Seq("a", "a", "b").toDS
.groupByKey(x => x)
- .flatMapGroupsWithState(Update, KeyedStateTimeout.NoTimeout)(stateFunc).toDF
+ .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc).toDF
checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF)
}
test("flatMapGroupsWithState - streaming with processing time timeout") {
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
if (state.hasTimedOut) {
state.remove()
Iterator((key, "-1"))
@@ -594,7 +594,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
val stateFunc = (
key: String,
values: Iterator[(String, Long)],
- state: KeyedState[Long]) => {
+ state: GroupState[Long]) => {
val timeoutDelay = 5
if (key != "a") {
Iterator.empty
@@ -637,7 +637,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
test("mapGroupsWithState - streaming") {
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
if (count == 3) {
@@ -676,7 +676,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
test("mapGroupsWithState - batch") {
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
if (state.exists) throw new IllegalArgumentException("state.exists should be false")
(key, values.size)
}
@@ -690,7 +690,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
testQuietly("StateStore.abort on task failure handling") {
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
if (FlatMapGroupsWithStateSuite.failInTask) throw new Exception("expected failure")
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
state.update(RunningCount(count))
@@ -724,7 +724,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
test("output partitioning is unknown") {
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => key
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => key
val inputData = MemoryStream[String]
val result = inputData.toDS.groupByKey(x => x).mapGroupsWithState(stateFunc)
testStream(result, Update)(
@@ -735,13 +735,13 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
test("disallow complete mode") {
- val stateFunc = (key: String, values: Iterator[String], state: KeyedState[Int]) => {
+ val stateFunc = (key: String, values: Iterator[String], state: GroupState[Int]) => {
Iterator[String]()
}
var e = intercept[IllegalArgumentException] {
MemoryStream[String].toDS().groupByKey(x => x).flatMapGroupsWithState(
- OutputMode.Complete, KeyedStateTimeout.NoTimeout)(stateFunc)
+ OutputMode.Complete, GroupStateTimeout.NoTimeout)(stateFunc)
}
assert(e.getMessage === "The output mode of function should be append or update")
@@ -750,20 +750,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
override def call(
key: String,
values: JIterator[String],
- state: KeyedState[Int]): JIterator[String] = { null }
+ state: GroupState[Int]): JIterator[String] = { null }
}
e = intercept[IllegalArgumentException] {
MemoryStream[String].toDS().groupByKey(x => x).flatMapGroupsWithState(
javaStateFunc, OutputMode.Complete,
- implicitly[Encoder[Int]], implicitly[Encoder[String]], KeyedStateTimeout.NoTimeout)
+ implicitly[Encoder[Int]], implicitly[Encoder[String]], GroupStateTimeout.NoTimeout)
}
assert(e.getMessage === "The output mode of function should be append or update")
}
def testStateUpdateWithData(
testName: String,
- stateUpdates: KeyedState[Int] => Unit,
- timeoutConf: KeyedStateTimeout,
+ stateUpdates: GroupState[Int] => Unit,
+ timeoutConf: GroupStateTimeout,
priorState: Option[Int],
priorTimeoutTimestamp: Long = NO_TIMESTAMP,
expectedState: Option[Int] = None,
@@ -773,7 +773,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
return // there can be no prior timestamp, when there is no prior state
}
test(s"StateStoreUpdater - updates with data - $testName") {
- val mapGroupsFunc = (key: Int, values: Iterator[Int], state: KeyedState[Int]) => {
+ val mapGroupsFunc = (key: Int, values: Iterator[Int], state: GroupState[Int]) => {
assert(state.hasTimedOut === false, "hasTimedOut not false")
assert(values.nonEmpty, "Some value is expected")
stateUpdates(state)
@@ -787,14 +787,14 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
def testStateUpdateWithTimeout(
testName: String,
- stateUpdates: KeyedState[Int] => Unit,
- timeoutConf: KeyedStateTimeout,
+ stateUpdates: GroupState[Int] => Unit,
+ timeoutConf: GroupStateTimeout,
priorTimeoutTimestamp: Long,
expectedState: Option[Int],
expectedTimeoutTimestamp: Long = NO_TIMESTAMP): Unit = {
test(s"StateStoreUpdater - updates for timeout - $testName") {
- val mapGroupsFunc = (key: Int, values: Iterator[Int], state: KeyedState[Int]) => {
+ val mapGroupsFunc = (key: Int, values: Iterator[Int], state: GroupState[Int]) => {
assert(state.hasTimedOut === true, "hasTimedOut not true")
assert(values.isEmpty, "values not empty")
stateUpdates(state)
@@ -808,8 +808,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
def testStateUpdate(
testTimeoutUpdates: Boolean,
- mapGroupsFunc: (Int, Iterator[Int], KeyedState[Int]) => Iterator[Int],
- timeoutConf: KeyedStateTimeout,
+ mapGroupsFunc: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int],
+ timeoutConf: GroupStateTimeout,
priorState: Option[Int],
priorTimeoutTimestamp: Long,
expectedState: Option[Int],
@@ -848,8 +848,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}
def newFlatMapGroupsWithStateExec(
- func: (Int, Iterator[Int], KeyedState[Int]) => Iterator[Int],
- timeoutType: KeyedStateTimeout = KeyedStateTimeout.NoTimeout,
+ func: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int],
+ timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout,
batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = {
MemoryStream[Int]
.toDS
@@ -863,7 +863,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
}.get
}
- def testTimeoutDurationNotAllowed[T <: Exception: Manifest](state: KeyedStateImpl[_]): Unit = {
+ def testTimeoutDurationNotAllowed[T <: Exception: Manifest](state: GroupStateImpl[_]): Unit = {
val prevTimestamp = state.getTimeoutTimestamp
intercept[T] { state.setTimeoutDuration(1000) }
assert(state.getTimeoutTimestamp === prevTimestamp)
@@ -871,7 +871,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assert(state.getTimeoutTimestamp === prevTimestamp)
}
- def testTimeoutTimestampNotAllowed[T <: Exception: Manifest](state: KeyedStateImpl[_]): Unit = {
+ def testTimeoutTimestampNotAllowed[T <: Exception: Manifest](state: GroupStateImpl[_]): Unit = {
val prevTimestamp = state.getTimeoutTimestamp
intercept[T] { state.setTimeoutTimestamp(2000) }
assert(state.getTimeoutTimestamp === prevTimestamp)