diff options
Diffstat (limited to 'sql/catalyst')
5 files changed, 344 insertions, 23 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 397f5cfe2a..a9ff61e0e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -51,6 +51,37 @@ object UnsupportedOperationChecker { subplan.collect { case a: Aggregate if a.isStreaming => a } } + val mapGroupsWithStates = plan.collect { + case f: FlatMapGroupsWithState if f.isStreaming && f.isMapGroupsWithState => f + } + + // Disallow multiple `mapGroupsWithState`s. + if (mapGroupsWithStates.size >= 2) { + throwError( + "Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets")(plan) + } + + val flatMapGroupsWithStates = plan.collect { + case f: FlatMapGroupsWithState if f.isStreaming && !f.isMapGroupsWithState => f + } + + // Disallow mixing `mapGroupsWithState`s and `flatMapGroupsWithState`s + if (mapGroupsWithStates.nonEmpty && flatMapGroupsWithStates.nonEmpty) { + throwError( + "Mixing mapGroupsWithStates and flatMapGroupsWithStates are not supported on a " + + "streaming DataFrames/Datasets")(plan) + } + + // Only allow multiple `FlatMapGroupsWithState(Append)`s in append mode. + if (flatMapGroupsWithStates.size >= 2 && ( + outputMode != InternalOutputModes.Append || + flatMapGroupsWithStates.exists(_.outputMode != InternalOutputModes.Append) + )) { + throwError( + "Multiple flatMapGroupsWithStates are not supported when they are not all in append mode" + + " or the output mode is not append on a streaming DataFrames/Datasets")(plan) + } + // Disallow multiple streaming aggregations val aggregates = collectStreamingAggregates(plan) @@ -116,9 +147,49 @@ object UnsupportedOperationChecker { throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") - case m: MapGroupsWithState if collectStreamingAggregates(m).nonEmpty => - throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " + - "streaming DataFrame/Dataset") + // mapGroupsWithState: Allowed only when no aggregation + Update output mode + case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState => + if (collectStreamingAggregates(plan).isEmpty) { + if (outputMode != InternalOutputModes.Update) { + throwError("mapGroupsWithState is not supported with " + + s"$outputMode output mode on a streaming DataFrame/Dataset") + } else { + // Allowed when no aggregation + Update output mode + } + } else { + throwError("mapGroupsWithState is not supported with aggregation " + + "on a streaming DataFrame/Dataset") + } + + // flatMapGroupsWithState without aggregation + case m: FlatMapGroupsWithState + if m.isStreaming && collectStreamingAggregates(plan).isEmpty => + m.outputMode match { + case InternalOutputModes.Update => + if (outputMode != InternalOutputModes.Update) { + throwError("flatMapGroupsWithState in update mode is not supported with " + + s"$outputMode output mode on a streaming DataFrame/Dataset") + } + case InternalOutputModes.Append => + if (outputMode != InternalOutputModes.Append) { + throwError("flatMapGroupsWithState in append mode is not supported with " + + s"$outputMode output mode on a streaming DataFrame/Dataset") + } + } + + // flatMapGroupsWithState(Update) with aggregation + case m: FlatMapGroupsWithState + if m.isStreaming && m.outputMode == InternalOutputModes.Update + && collectStreamingAggregates(plan).nonEmpty => + throwError("flatMapGroupsWithState in update mode is not supported with " + + "aggregation on a streaming DataFrame/Dataset") + + // flatMapGroupsWithState(Append) with aggregation + case m: FlatMapGroupsWithState + if m.isStreaming && m.outputMode == InternalOutputModes.Append + && collectStreamingAggregates(m).nonEmpty => + throwError("flatMapGroupsWithState in append mode is not supported after " + + s"aggregation on a streaming DataFrame/Dataset") case d: Deduplicate if collectStreamingAggregates(d).nonEmpty => throwError("dropDuplicates is not supported after aggregation on a " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 0be4823bbc..617239f56c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.Invoke +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types._ object CatalystSerde { @@ -317,13 +318,15 @@ case class MapGroups( trait LogicalKeyedState[S] /** Factory for constructing new `MapGroupsWithState` nodes. */ -object MapGroupsWithState { +object FlatMapGroupsWithState { def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder]( func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any], groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], + outputMode: OutputMode, + isMapGroupsWithState: Boolean, child: LogicalPlan): LogicalPlan = { - val mapped = new MapGroupsWithState( + val mapped = new FlatMapGroupsWithState( func, UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes), UnresolvedDeserializer(encoderFor[V].deserializer, dataAttributes), @@ -332,7 +335,9 @@ object MapGroupsWithState { CatalystSerde.generateObjAttr[U], encoderFor[S].resolveAndBind().deserializer, encoderFor[S].namedExpressions, - child) + outputMode, + child, + isMapGroupsWithState) CatalystSerde.serialize[U](mapped) } } @@ -350,8 +355,10 @@ object MapGroupsWithState { * @param outputObjAttr used to define the output object * @param stateDeserializer used to deserialize state before calling `func` * @param stateSerializer used to serialize updated state after calling `func` + * @param outputMode the output mode of `func` + * @param isMapGroupsWithState whether it is created by the `mapGroupsWithState` method */ -case class MapGroupsWithState( +case class FlatMapGroupsWithState( func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any], keyDeserializer: Expression, valueDeserializer: Expression, @@ -360,7 +367,14 @@ case class MapGroupsWithState( outputObjAttr: Attribute, stateDeserializer: Expression, stateSerializer: Seq[NamedExpression], - child: LogicalPlan) extends UnaryNode with ObjectProducer + outputMode: OutputMode, + child: LogicalPlan, + isMapGroupsWithState: Boolean = false) extends UnaryNode with ObjectProducer { + + if (isMapGroupsWithState) { + assert(outputMode == OutputMode.Update) + } +} /** Factory for constructing new `FlatMapGroupsInR` nodes. */ object FlatMapGroupsInR { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala index 351bd6fff4..bdf2baf736 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala @@ -44,4 +44,19 @@ private[sql] object InternalOutputModes { * aggregations, it will be equivalent to `Append` mode. */ case object Update extends OutputMode + + + def apply(outputMode: String): OutputMode = { + outputMode.toLowerCase match { + case "append" => + OutputMode.Append + case "complete" => + OutputMode.Complete + case "update" => + OutputMode.Update + case _ => + throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append', 'complete', 'update'") + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 82be69a0f7..200c39f43a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{MapGroupsWithState, _} +import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder} @@ -138,29 +138,202 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Complete, expectedMsgs = Seq("distinct aggregation")) - // MapGroupsWithState: Not supported after a streaming aggregation val att = new AttributeReference(name = "a", dataType = LongType)() - assertSupportedInBatchPlan( - "mapGroupsWithState - mapGroupsWithState on batch relation", - MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation)) + // FlatMapGroupsWithState: Both function modes equivalent and supported in batch. + for (funcMode <- Seq(Append, Update)) { + assertSupportedInBatchPlan( + s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation)) + + assertSupportedInBatchPlan( + s"flatMapGroupsWithState - multiple flatMapGroupsWithState($funcMode)s on batch relation", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation))) + } + + // FlatMapGroupsWithState(Update) in streaming without aggregation + assertSupportedInStreamingPlan( + "flatMapGroupsWithState - flatMapGroupsWithState(Update) " + + "on streaming relation without aggregation in update mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation), + outputMode = Update) + + assertNotSupportedInStreamingPlan( + "flatMapGroupsWithState - flatMapGroupsWithState(Update) " + + "on streaming relation without aggregation in append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation), + outputMode = Append, + expectedMsgs = Seq("flatMapGroupsWithState in update mode", "Append")) + + assertNotSupportedInStreamingPlan( + "flatMapGroupsWithState - flatMapGroupsWithState(Update) " + + "on streaming relation without aggregation in complete mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation), + outputMode = Complete, + // Disallowed by the aggregation check but let's still keep this test in case it's broken in + // future. + expectedMsgs = Seq("Complete")) + + // FlatMapGroupsWithState(Update) in streaming with aggregation + for (outputMode <- Seq(Append, Update, Complete)) { + assertNotSupportedInStreamingPlan( + "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + + s"with aggregation in $outputMode mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), + outputMode = outputMode, + expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation")) + } + // FlatMapGroupsWithState(Append) in streaming without aggregation assertSupportedInStreamingPlan( - "mapGroupsWithState - mapGroupsWithState on streaming relation before aggregation", - MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), streamRelation), + "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + + "on streaming relation without aggregation in append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation), outputMode = Append) assertNotSupportedInStreamingPlan( - "mapGroupsWithState - mapGroupsWithState on streaming relation after aggregation", - MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), - Aggregate(Nil, aggExprs("c"), streamRelation)), + "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + + "on streaming relation without aggregation in update mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation), + outputMode = Update, + expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update")) + + // FlatMapGroupsWithState(Append) in streaming with aggregation + for (outputMode <- Seq(Append, Update, Complete)) { + assertSupportedInStreamingPlan( + "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + + s"on streaming relation before aggregation in $outputMode mode", + Aggregate( + Seq(attributeWithWatermark), + aggExprs("c"), + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)), + outputMode = outputMode) + } + + for (outputMode <- Seq(Append, Update)) { + assertNotSupportedInStreamingPlan( + "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + + s"on streaming relation after aggregation in $outputMode mode", + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, + Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), + outputMode = outputMode, + expectedMsgs = Seq("flatMapGroupsWithState", "after aggregation")) + } + + assertNotSupportedInStreamingPlan( + "flatMapGroupsWithState - " + + "flatMapGroupsWithState(Update) on streaming relation in complete mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation), outputMode = Complete, - expectedMsgs = Seq("(map/flatMap)GroupsWithState")) + // Disallowed by the aggregation check but let's still keep this test in case it's broken in + // future. + expectedMsgs = Seq("Complete")) + // FlatMapGroupsWithState inside batch relation should always be allowed + for (funcMode <- Seq(Append, Update)) { + for (outputMode <- Seq(Append, Update)) { // Complete is not supported without aggregation + assertSupportedInStreamingPlan( + s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation inside " + + s"streaming relation in $outputMode output mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation), + outputMode = outputMode + ) + } + } + + // multiple FlatMapGroupsWithStates assertSupportedInStreamingPlan( - "mapGroupsWithState - mapGroupsWithState on batch relation inside streaming relation", - MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation), - outputMode = Append - ) + "flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " + + "in append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)), + outputMode = Append) + + assertNotSupportedInStreamingPlan( + "flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" + + " are not in append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)), + outputMode = Append, + expectedMsgs = Seq("multiple flatMapGroupsWithState", "append")) + + // mapGroupsWithState + assertNotSupportedInStreamingPlan( + "mapGroupsWithState - mapGroupsWithState " + + "on streaming relation without aggregation in append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, + isMapGroupsWithState = true), + outputMode = Append, + // Disallowed by the aggregation check but let's still keep this test in case it's broken in + // future. + expectedMsgs = Seq("mapGroupsWithState", "append")) + + assertNotSupportedInStreamingPlan( + "mapGroupsWithState - mapGroupsWithState " + + "on streaming relation without aggregation in complete mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, + isMapGroupsWithState = true), + outputMode = Complete, + // Disallowed by the aggregation check but let's still keep this test in case it's broken in + // future. + expectedMsgs = Seq("Complete")) + + for (outputMode <- Seq(Append, Update, Complete)) { + assertNotSupportedInStreamingPlan( + "mapGroupsWithState - mapGroupsWithState on streaming relation " + + s"with aggregation in $outputMode mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation), + isMapGroupsWithState = true), + outputMode = outputMode, + expectedMsgs = Seq("mapGroupsWithState", "with aggregation")) + } + + // multiple mapGroupsWithStates + assertNotSupportedInStreamingPlan( + "mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are " + + "in append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, + isMapGroupsWithState = true), + isMapGroupsWithState = true), + outputMode = Append, + expectedMsgs = Seq("multiple mapGroupsWithStates")) + + // mixing mapGroupsWithStates and flatMapGroupsWithStates + assertNotSupportedInStreamingPlan( + "mapGroupsWithState - " + + "mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, + isMapGroupsWithState = false), + isMapGroupsWithState = true), + outputMode = Append, + expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates")) // Deduplicate assertSupportedInStreamingPlan( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala new file mode 100644 index 0000000000..201dac35ed --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModesSuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.streaming + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.streaming.OutputMode + +class InternalOutputModesSuite extends SparkFunSuite { + + test("supported strings") { + def testMode(outputMode: String, expected: OutputMode): Unit = { + assert(InternalOutputModes(outputMode) === expected) + } + + testMode("append", OutputMode.Append) + testMode("Append", OutputMode.Append) + testMode("complete", OutputMode.Complete) + testMode("Complete", OutputMode.Complete) + testMode("update", OutputMode.Update) + testMode("Update", OutputMode.Update) + } + + test("unsupported strings") { + def testMode(outputMode: String): Unit = { + val acceptedModes = Seq("append", "update", "complete") + val e = intercept[IllegalArgumentException](InternalOutputModes(outputMode)) + (Seq("output mode", "unknown", outputMode) ++ acceptedModes).foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + testMode("Xyz") + } +} |