aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-19 14:07:49 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-19 14:07:49 -0700
commit990af630d0d569880edd9c7ce9932e10037a28ab (patch)
tree3c25483808ca877693f42d3d10ebd49987e86645 /sql/catalyst
parent0ee9fbf51ac863e015d57ae7824a39bd3b36141a (diff)
downloadspark-990af630d0d569880edd9c7ce9932e10037a28ab.tar.gz
spark-990af630d0d569880edd9c7ce9932e10037a28ab.tar.bz2
spark-990af630d0d569880edd9c7ce9932e10037a28ab.zip
[SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithState
## What changes were proposed in this pull request? When a key does not get any new data in `mapGroupsWithState`, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.). Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this. ``` def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = { ... state.setTimeoutDuration(10000) ... } dataset // type is Dataset[T] .groupByKey[K](keyingFunc) // generates KeyValueGroupedDataset[K, T] .mapGroupsWithState[S, U]( func = stateFunction, timeout = KeyedStateTimeout.withProcessingTime) // returns Dataset[U] ``` Note the following design aspects. - The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps). - The exact timeout duration is provided inside the function call so that it can be customized on a per key basis. - When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true. - The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set. Guarantees provided on timeout of key, when timeout duration is D ms: - Timeout will never be called before real clock time has advanced by D ms - Timeout will be called eventually when there is a trigger with any data in it (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur. For example, if there is no data in the stream (for any key) for a while, then the timeout will not be hit. Implementation details: - Added new param to `mapGroupsWithState` for timeout - Added new method to `StateStore` to filter data based on timeout timestamp - Changed the internal map type of `HDFSBackedStateStore` from Java's `HashMap` to `ConcurrentHashMap` as the latter allows weakly-consistent fail-safe iterators on the map data. See comments in code for more details. - Refactored logic of `MapGroupsWithStateExec` to - Save timeout info to state store for each key that has data. - Then, filter states that should be timed out based on the current batch processing timestamp. - Moved KeyedState for `o.a.s.sql` to `o.a.s.sql.streaming`. I remember that this was a feedback in the MapGroupsWithState PR that I had forgotten to address. ## How was this patch tested? New unit tests in - MapGroupsWithStateSuite for timeouts. - StateStoreSuite for new APIs in StateStore. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17179 from tdas/mapgroupwithstate-timeout.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java42
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala30
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java29
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala80
5 files changed, 136 insertions, 47 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
new file mode 100644
index 0000000000..cf112f2e02
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.plans.logical.NoTimeout$;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
+
+/**
+ * Represents the type of timeouts possible for the Dataset operations
+ * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
+ * `KeyedState` for more details.
+ *
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class KeyedStateTimeout {
+
+ /** Timeout based on processing time. */
+ public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
+
+ /** No timeout */
+ public static KeyedStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 36bf3017d4..771ac28e51 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -951,7 +951,7 @@ case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
override def eval(input: InternalRow): Any = {
val result = child.eval(input)
if (result == null) {
- throw new RuntimeException(errMsg);
+ throw new RuntimeException(errMsg)
}
result
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 7f4462e583..d1f95faf2d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode }
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -353,6 +353,10 @@ case class MapGroups(
/** Internal class representing State */
trait LogicalKeyedState[S]
+/** Possible types of timeouts used in FlatMapGroupsWithState */
+case object NoTimeout extends KeyedStateTimeout
+case object ProcessingTimeTimeout extends KeyedStateTimeout
+
/** Factory for constructing new `MapGroupsWithState` nodes. */
object FlatMapGroupsWithState {
def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
@@ -361,7 +365,10 @@ object FlatMapGroupsWithState {
dataAttributes: Seq[Attribute],
outputMode: OutputMode,
isMapGroupsWithState: Boolean,
+ timeout: KeyedStateTimeout,
child: LogicalPlan): LogicalPlan = {
+ val encoder = encoderFor[S]
+
val mapped = new FlatMapGroupsWithState(
func,
UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes),
@@ -369,11 +376,11 @@ object FlatMapGroupsWithState {
groupingAttributes,
dataAttributes,
CatalystSerde.generateObjAttr[U],
- encoderFor[S].resolveAndBind().deserializer,
- encoderFor[S].namedExpressions,
+ encoder.asInstanceOf[ExpressionEncoder[Any]],
outputMode,
- child,
- isMapGroupsWithState)
+ isMapGroupsWithState,
+ timeout,
+ child)
CatalystSerde.serialize[U](mapped)
}
}
@@ -384,15 +391,16 @@ object FlatMapGroupsWithState {
* Func is invoked with an object representation of the grouping key an iterator containing the
* object representation of all the rows with that key.
*
+ * @param func function called on each group
* @param keyDeserializer used to extract the key object for each group.
* @param valueDeserializer used to extract the items in the iterator from an input row.
* @param groupingAttributes used to group the data
* @param dataAttributes used to read the data
* @param outputObjAttr used to define the output object
- * @param stateDeserializer used to deserialize state before calling `func`
- * @param stateSerializer used to serialize updated state after calling `func`
+ * @param stateEncoder used to serialize/deserialize state before calling `func`
* @param outputMode the output mode of `func`
* @param isMapGroupsWithState whether it is created by the `mapGroupsWithState` method
+ * @param timeout used to timeout groups that have not received data in a while
*/
case class FlatMapGroupsWithState(
func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
@@ -401,11 +409,11 @@ case class FlatMapGroupsWithState(
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
outputObjAttr: Attribute,
- stateDeserializer: Expression,
- stateSerializer: Seq[NamedExpression],
+ stateEncoder: ExpressionEncoder[Any],
outputMode: OutputMode,
- child: LogicalPlan,
- isMapGroupsWithState: Boolean = false) extends UnaryNode with ObjectProducer {
+ isMapGroupsWithState: Boolean = false,
+ timeout: KeyedStateTimeout,
+ child: LogicalPlan) extends UnaryNode with ObjectProducer {
if (isMapGroupsWithState) {
assert(outputMode == OutputMode.Update)
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
new file mode 100644
index 0000000000..02c94b0b32
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
+import org.junit.Test;
+
+public class JavaKeyedStateTimeoutSuite {
+
+ @Test
+ public void testTimeouts() {
+ assert(KeyedStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 200c39f43a..08216e2660 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -144,14 +144,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertSupportedInBatchPlan(
s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation))
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, null,
+ batchRelation))
assertSupportedInBatchPlan(
s"flatMapGroupsWithState - multiple flatMapGroupsWithState($funcMode)s on batch relation",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode,
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation)))
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false,
+ null, batchRelation)))
}
// FlatMapGroupsWithState(Update) in streaming without aggregation
@@ -159,14 +161,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
"on streaming relation without aggregation in update mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Update)
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
"on streaming relation without aggregation in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Append,
expectedMsgs = Seq("flatMapGroupsWithState in update mode", "Append"))
@@ -174,7 +178,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
"on streaming relation without aggregation in complete mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Complete,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -186,7 +191,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
s"with aggregation in $outputMode mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation"))
@@ -197,14 +202,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
"on streaming relation without aggregation in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Append)
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
"on streaming relation without aggregation in update mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Update,
expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update"))
@@ -217,7 +224,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
Seq(attributeWithWatermark),
aggExprs("c"),
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation)),
outputMode = outputMode)
}
@@ -225,7 +233,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
s"on streaming relation after aggregation in $outputMode mode",
- FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append,
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
+ isMapGroupsWithState = false, null,
Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
expectedMsgs = Seq("flatMapGroupsWithState", "after aggregation"))
@@ -235,7 +244,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - " +
"flatMapGroupsWithState(Update) on streaming relation in complete mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Complete,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -248,7 +258,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation inside " +
s"streaming relation in $outputMode output mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation),
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false,
+ null, batchRelation),
outputMode = outputMode
)
}
@@ -258,19 +269,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " +
"in append mode",
- FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append,
- FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
+ isMapGroupsWithState = false, null,
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
+ isMapGroupsWithState = false, null, streamRelation)),
outputMode = Append)
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" +
" are not in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation)),
outputMode = Append,
expectedMsgs = Seq("multiple flatMapGroupsWithState", "append"))
@@ -279,8 +291,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - mapGroupsWithState " +
"on streaming relation without aggregation in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
+ streamRelation),
outputMode = Append,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -290,8 +302,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - mapGroupsWithState " +
"on streaming relation without aggregation in complete mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
+ streamRelation),
outputMode = Complete,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -301,10 +313,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertNotSupportedInStreamingPlan(
"mapGroupsWithState - mapGroupsWithState on streaming relation " +
s"with aggregation in $outputMode mode",
- FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
- Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation),
- isMapGroupsWithState = true),
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Update,
+ isMapGroupsWithState = true, null,
+ Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
expectedMsgs = Seq("mapGroupsWithState", "with aggregation"))
}
@@ -314,11 +325,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are " +
"in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = true),
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
+ streamRelation)),
outputMode = Append,
expectedMsgs = Seq("multiple mapGroupsWithStates"))
@@ -327,11 +337,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - " +
"mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = false),
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation)
+ ),
outputMode = Append,
expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates"))