aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-03-08 13:18:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-08 13:18:07 -0800
commit1bf9012380de2aa7bdf39220b55748defde8b700 (patch)
tree3efc5be6f6506eef72a98c08132d95c0ce9f8fcd /sql/catalyst
parente9e2c612d58a19ddcb4b6abfb7389a4b0f7ef6f8 (diff)
downloadspark-1bf9012380de2aa7bdf39220b55748defde8b700.tar.gz
spark-1bf9012380de2aa7bdf39220b55748defde8b700.tar.bz2
spark-1bf9012380de2aa7bdf39220b55748defde8b700.zip
[SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow invalid cases
## What changes were proposed in this pull request? Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`. `UnsupportedOperationChecker` is modified to disallow unsupported cases. - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed. - For streaming (map/flatMap)GroupsWithState, see the following table: | Operators | Supported Query Output Mode | | ------------- | ------------- | | flatMapGroupsWithState(Update) without aggregation | Update | | flatMapGroupsWithState(Update) with aggregation | None | | flatMapGroupsWithState(Append) without aggregation | Append | | flatMapGroupsWithState(Append) before aggregation | Append, Update, Complete | | flatMapGroupsWithState(Append) after aggregation | None | | Multiple flatMapGroupsWithState(Append)s | Append | | Multiple mapGroupsWithStates | None | | Mxing mapGroupsWithStates and flatMapGroupsWithStates | None | | Other cases of multiple flatMapGroupsWithState | None | ## How was this patch tested? The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: ``` [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #17197 from zsxwing/mapgroups-check.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala77
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala24
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala15
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala203
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala48
5 files changed, 344 insertions, 23 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 397f5cfe2a..a9ff61e0e8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -51,6 +51,37 @@ object UnsupportedOperationChecker {
subplan.collect { case a: Aggregate if a.isStreaming => a }
}
+ val mapGroupsWithStates = plan.collect {
+ case f: FlatMapGroupsWithState if f.isStreaming && f.isMapGroupsWithState => f
+ }
+
+ // Disallow multiple `mapGroupsWithState`s.
+ if (mapGroupsWithStates.size >= 2) {
+ throwError(
+ "Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets")(plan)
+ }
+
+ val flatMapGroupsWithStates = plan.collect {
+ case f: FlatMapGroupsWithState if f.isStreaming && !f.isMapGroupsWithState => f
+ }
+
+ // Disallow mixing `mapGroupsWithState`s and `flatMapGroupsWithState`s
+ if (mapGroupsWithStates.nonEmpty && flatMapGroupsWithStates.nonEmpty) {
+ throwError(
+ "Mixing mapGroupsWithStates and flatMapGroupsWithStates are not supported on a " +
+ "streaming DataFrames/Datasets")(plan)
+ }
+
+ // Only allow multiple `FlatMapGroupsWithState(Append)`s in append mode.
+ if (flatMapGroupsWithStates.size >= 2 && (
+ outputMode != InternalOutputModes.Append ||
+ flatMapGroupsWithStates.exists(_.outputMode != InternalOutputModes.Append)
+ )) {
+ throwError(
+ "Multiple flatMapGroupsWithStates are not supported when they are not all in append mode" +
+ " or the output mode is not append on a streaming DataFrames/Datasets")(plan)
+ }
+
// Disallow multiple streaming aggregations
val aggregates = collectStreamingAggregates(plan)
@@ -116,9 +147,49 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
- case m: MapGroupsWithState if collectStreamingAggregates(m).nonEmpty =>
- throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " +
- "streaming DataFrame/Dataset")
+ // mapGroupsWithState: Allowed only when no aggregation + Update output mode
+ case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
+ if (collectStreamingAggregates(plan).isEmpty) {
+ if (outputMode != InternalOutputModes.Update) {
+ throwError("mapGroupsWithState is not supported with " +
+ s"$outputMode output mode on a streaming DataFrame/Dataset")
+ } else {
+ // Allowed when no aggregation + Update output mode
+ }
+ } else {
+ throwError("mapGroupsWithState is not supported with aggregation " +
+ "on a streaming DataFrame/Dataset")
+ }
+
+ // flatMapGroupsWithState without aggregation
+ case m: FlatMapGroupsWithState
+ if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
+ m.outputMode match {
+ case InternalOutputModes.Update =>
+ if (outputMode != InternalOutputModes.Update) {
+ throwError("flatMapGroupsWithState in update mode is not supported with " +
+ s"$outputMode output mode on a streaming DataFrame/Dataset")
+ }
+ case InternalOutputModes.Append =>
+ if (outputMode != InternalOutputModes.Append) {
+ throwError("flatMapGroupsWithState in append mode is not supported with " +
+ s"$outputMode output mode on a streaming DataFrame/Dataset")
+ }
+ }
+
+ // flatMapGroupsWithState(Update) with aggregation
+ case m: FlatMapGroupsWithState
+ if m.isStreaming && m.outputMode == InternalOutputModes.Update
+ && collectStreamingAggregates(plan).nonEmpty =>
+ throwError("flatMapGroupsWithState in update mode is not supported with " +
+ "aggregation on a streaming DataFrame/Dataset")
+
+ // flatMapGroupsWithState(Append) with aggregation
+ case m: FlatMapGroupsWithState
+ if m.isStreaming && m.outputMode == InternalOutputModes.Append
+ && collectStreamingAggregates(m).nonEmpty =>
+ throwError("flatMapGroupsWithState in append mode is not supported after " +
+ s"aggregation on a streaming DataFrame/Dataset")
case d: Deduplicate if collectStreamingAggregates(d).nonEmpty =>
throwError("dropDuplicates is not supported after aggregation on a " +
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 0be4823bbc..617239f56c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
object CatalystSerde {
@@ -317,13 +318,15 @@ case class MapGroups(
trait LogicalKeyedState[S]
/** Factory for constructing new `MapGroupsWithState` nodes. */
-object MapGroupsWithState {
+object FlatMapGroupsWithState {
def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
+ outputMode: OutputMode,
+ isMapGroupsWithState: Boolean,
child: LogicalPlan): LogicalPlan = {
- val mapped = new MapGroupsWithState(
+ val mapped = new FlatMapGroupsWithState(
func,
UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes),
UnresolvedDeserializer(encoderFor[V].deserializer, dataAttributes),
@@ -332,7 +335,9 @@ object MapGroupsWithState {
CatalystSerde.generateObjAttr[U],
encoderFor[S].resolveAndBind().deserializer,
encoderFor[S].namedExpressions,
- child)
+ outputMode,
+ child,
+ isMapGroupsWithState)
CatalystSerde.serialize[U](mapped)
}
}
@@ -350,8 +355,10 @@ object MapGroupsWithState {
* @param outputObjAttr used to define the output object
* @param stateDeserializer used to deserialize state before calling `func`
* @param stateSerializer used to serialize updated state after calling `func`
+ * @param outputMode the output mode of `func`
+ * @param isMapGroupsWithState whether it is created by the `mapGroupsWithState` method
*/
-case class MapGroupsWithState(
+case class FlatMapGroupsWithState(
func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
@@ -360,7 +367,14 @@ case class MapGroupsWithState(
outputObjAttr: Attribute,
stateDeserializer: Expression,
stateSerializer: Seq[NamedExpression],
- child: LogicalPlan) extends UnaryNode with ObjectProducer
+ outputMode: OutputMode,
+ child: LogicalPlan,
+ isMapGroupsWithState: Boolean = false) extends UnaryNode with ObjectProducer {
+
+ if (isMapGroupsWithState) {
+ assert(outputMode == OutputMode.Update)
+ }
+}
/** Factory for constructing new `FlatMapGroupsInR` nodes. */
object FlatMapGroupsInR {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
index 351bd6fff4..bdf2baf736 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
@@ -44,4 +44,19 @@ private[sql] object InternalOutputModes {
* aggregations, it will be equivalent to `Append` mode.
*/
case object Update extends OutputMode
+
+
+ def apply(outputMode: String): OutputMode = {
+ outputMode.toLowerCase match {
+ case "append" =>
+ OutputMode.Append
+ case "complete" =>
+ OutputMode.Complete
+ case "update" =>
+ OutputMode.Update
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
+ "Accepted output modes are 'append', 'complete', 'update'")
+ }
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 82be69a0f7..200c39f43a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{MapGroupsWithState, _}
+import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder}
@@ -138,29 +138,202 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Complete,
expectedMsgs = Seq("distinct aggregation"))
- // MapGroupsWithState: Not supported after a streaming aggregation
val att = new AttributeReference(name = "a", dataType = LongType)()
- assertSupportedInBatchPlan(
- "mapGroupsWithState - mapGroupsWithState on batch relation",
- MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation))
+ // FlatMapGroupsWithState: Both function modes equivalent and supported in batch.
+ for (funcMode <- Seq(Append, Update)) {
+ assertSupportedInBatchPlan(
+ s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation))
+
+ assertSupportedInBatchPlan(
+ s"flatMapGroupsWithState - multiple flatMapGroupsWithState($funcMode)s on batch relation",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode,
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation)))
+ }
+
+ // FlatMapGroupsWithState(Update) in streaming without aggregation
+ assertSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
+ "on streaming relation without aggregation in update mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ outputMode = Update)
+
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
+ "on streaming relation without aggregation in append mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ outputMode = Append,
+ expectedMsgs = Seq("flatMapGroupsWithState in update mode", "Append"))
+
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
+ "on streaming relation without aggregation in complete mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ outputMode = Complete,
+ // Disallowed by the aggregation check but let's still keep this test in case it's broken in
+ // future.
+ expectedMsgs = Seq("Complete"))
+
+ // FlatMapGroupsWithState(Update) in streaming with aggregation
+ for (outputMode <- Seq(Append, Update, Complete)) {
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
+ s"with aggregation in $outputMode mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+ outputMode = outputMode,
+ expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation"))
+ }
+ // FlatMapGroupsWithState(Append) in streaming without aggregation
assertSupportedInStreamingPlan(
- "mapGroupsWithState - mapGroupsWithState on streaming relation before aggregation",
- MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), streamRelation),
+ "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
+ "on streaming relation without aggregation in append mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
outputMode = Append)
assertNotSupportedInStreamingPlan(
- "mapGroupsWithState - mapGroupsWithState on streaming relation after aggregation",
- MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att),
- Aggregate(Nil, aggExprs("c"), streamRelation)),
+ "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
+ "on streaming relation without aggregation in update mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ outputMode = Update,
+ expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update"))
+
+ // FlatMapGroupsWithState(Append) in streaming with aggregation
+ for (outputMode <- Seq(Append, Update, Complete)) {
+ assertSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
+ s"on streaming relation before aggregation in $outputMode mode",
+ Aggregate(
+ Seq(attributeWithWatermark),
+ aggExprs("c"),
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ outputMode = outputMode)
+ }
+
+ for (outputMode <- Seq(Append, Update)) {
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
+ s"on streaming relation after aggregation in $outputMode mode",
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append,
+ Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
+ outputMode = outputMode,
+ expectedMsgs = Seq("flatMapGroupsWithState", "after aggregation"))
+ }
+
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - " +
+ "flatMapGroupsWithState(Update) on streaming relation in complete mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
outputMode = Complete,
- expectedMsgs = Seq("(map/flatMap)GroupsWithState"))
+ // Disallowed by the aggregation check but let's still keep this test in case it's broken in
+ // future.
+ expectedMsgs = Seq("Complete"))
+ // FlatMapGroupsWithState inside batch relation should always be allowed
+ for (funcMode <- Seq(Append, Update)) {
+ for (outputMode <- Seq(Append, Update)) { // Complete is not supported without aggregation
+ assertSupportedInStreamingPlan(
+ s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation inside " +
+ s"streaming relation in $outputMode output mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation),
+ outputMode = outputMode
+ )
+ }
+ }
+
+ // multiple FlatMapGroupsWithStates
assertSupportedInStreamingPlan(
- "mapGroupsWithState - mapGroupsWithState on batch relation inside streaming relation",
- MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation),
- outputMode = Append
- )
+ "flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " +
+ "in append mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append,
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ outputMode = Append)
+
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" +
+ " are not in append mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ outputMode = Append,
+ expectedMsgs = Seq("multiple flatMapGroupsWithState", "append"))
+
+ // mapGroupsWithState
+ assertNotSupportedInStreamingPlan(
+ "mapGroupsWithState - mapGroupsWithState " +
+ "on streaming relation without aggregation in append mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
+ isMapGroupsWithState = true),
+ outputMode = Append,
+ // Disallowed by the aggregation check but let's still keep this test in case it's broken in
+ // future.
+ expectedMsgs = Seq("mapGroupsWithState", "append"))
+
+ assertNotSupportedInStreamingPlan(
+ "mapGroupsWithState - mapGroupsWithState " +
+ "on streaming relation without aggregation in complete mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
+ isMapGroupsWithState = true),
+ outputMode = Complete,
+ // Disallowed by the aggregation check but let's still keep this test in case it's broken in
+ // future.
+ expectedMsgs = Seq("Complete"))
+
+ for (outputMode <- Seq(Append, Update, Complete)) {
+ assertNotSupportedInStreamingPlan(
+ "mapGroupsWithState - mapGroupsWithState on streaming relation " +
+ s"with aggregation in $outputMode mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation),
+ isMapGroupsWithState = true),
+ outputMode = outputMode,
+ expectedMsgs = Seq("mapGroupsWithState", "with aggregation"))
+ }
+
+ // multiple mapGroupsWithStates
+ assertNotSupportedInStreamingPlan(
+ "mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are " +
+ "in append mode",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
+ isMapGroupsWithState = true),
+ isMapGroupsWithState = true),
+ outputMode = Append,
+ expectedMsgs = Seq("multiple mapGroupsWithStates"))
+
+ // mixing mapGroupsWithStates and flatMapGroupsWithStates
+ assertNotSupportedInStreamingPlan(
+ "mapGroupsWithState - " +
+ "mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation",
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ FlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
+ isMapGroupsWithState = false),
+ isMapGroupsWithState = true),
+ outputMode = Append,
+ expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates"))
// Deduplicate
assertSupportedInStreamingPlan(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala
new file mode 100644
index 0000000000..201dac35ed
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.OutputMode
+
+class InternalOutputModesSuite extends SparkFunSuite {
+
+ test("supported strings") {
+ def testMode(outputMode: String, expected: OutputMode): Unit = {
+ assert(InternalOutputModes(outputMode) === expected)
+ }
+
+ testMode("append", OutputMode.Append)
+ testMode("Append", OutputMode.Append)
+ testMode("complete", OutputMode.Complete)
+ testMode("Complete", OutputMode.Complete)
+ testMode("update", OutputMode.Update)
+ testMode("Update", OutputMode.Update)
+ }
+
+ test("unsupported strings") {
+ def testMode(outputMode: String): Unit = {
+ val acceptedModes = Seq("append", "update", "complete")
+ val e = intercept[IllegalArgumentException](InternalOutputModes(outputMode))
+ (Seq("output mode", "unknown", outputMode) ++ acceptedModes).foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+ }
+ testMode("Xyz")
+ }
+}