diff options
Diffstat (limited to 'sql/catalyst')
5 files changed, 136 insertions, 47 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java new file mode 100644 index 0000000000..cf112f2e02 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.plans.logical.NoTimeout$; +import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout; +import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$; + +/** + * Represents the type of timeouts possible for the Dataset operations + * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on + * `KeyedState` for more details. + * + * @since 2.2.0 + */ +@Experimental +@InterfaceStability.Evolving +public class KeyedStateTimeout { + + /** Timeout based on processing time. */ + public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; } + + /** No timeout */ + public static KeyedStateTimeout NoTimeout() { return NoTimeout$.MODULE$; } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 36bf3017d4..771ac28e51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -951,7 +951,7 @@ case class AssertNotNull(child: Expression, walkedTypePath: Seq[String]) override def eval(input: InternalRow): Any = { val result = child.eval(input) if (result == null) { - throw new RuntimeException(errMsg); + throw new RuntimeException(errMsg) } result } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 7f4462e583..d1f95faf2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.Invoke -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode } import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -353,6 +353,10 @@ case class MapGroups( /** Internal class representing State */ trait LogicalKeyedState[S] +/** Possible types of timeouts used in FlatMapGroupsWithState */ +case object NoTimeout extends KeyedStateTimeout +case object ProcessingTimeTimeout extends KeyedStateTimeout + /** Factory for constructing new `MapGroupsWithState` nodes. */ object FlatMapGroupsWithState { def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder]( @@ -361,7 +365,10 @@ object FlatMapGroupsWithState { dataAttributes: Seq[Attribute], outputMode: OutputMode, isMapGroupsWithState: Boolean, + timeout: KeyedStateTimeout, child: LogicalPlan): LogicalPlan = { + val encoder = encoderFor[S] + val mapped = new FlatMapGroupsWithState( func, UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes), @@ -369,11 +376,11 @@ object FlatMapGroupsWithState { groupingAttributes, dataAttributes, CatalystSerde.generateObjAttr[U], - encoderFor[S].resolveAndBind().deserializer, - encoderFor[S].namedExpressions, + encoder.asInstanceOf[ExpressionEncoder[Any]], outputMode, - child, - isMapGroupsWithState) + isMapGroupsWithState, + timeout, + child) CatalystSerde.serialize[U](mapped) } } @@ -384,15 +391,16 @@ object FlatMapGroupsWithState { * Func is invoked with an object representation of the grouping key an iterator containing the * object representation of all the rows with that key. * + * @param func function called on each group * @param keyDeserializer used to extract the key object for each group. * @param valueDeserializer used to extract the items in the iterator from an input row. * @param groupingAttributes used to group the data * @param dataAttributes used to read the data * @param outputObjAttr used to define the output object - * @param stateDeserializer used to deserialize state before calling `func` - * @param stateSerializer used to serialize updated state after calling `func` + * @param stateEncoder used to serialize/deserialize state before calling `func` * @param outputMode the output mode of `func` * @param isMapGroupsWithState whether it is created by the `mapGroupsWithState` method + * @param timeout used to timeout groups that have not received data in a while */ case class FlatMapGroupsWithState( func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any], @@ -401,11 +409,11 @@ case class FlatMapGroupsWithState( groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], outputObjAttr: Attribute, - stateDeserializer: Expression, - stateSerializer: Seq[NamedExpression], + stateEncoder: ExpressionEncoder[Any], outputMode: OutputMode, - child: LogicalPlan, - isMapGroupsWithState: Boolean = false) extends UnaryNode with ObjectProducer { + isMapGroupsWithState: Boolean = false, + timeout: KeyedStateTimeout, + child: LogicalPlan) extends UnaryNode with ObjectProducer { if (isMapGroupsWithState) { assert(outputMode == OutputMode.Update) diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java new file mode 100644 index 0000000000..02c94b0b32 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$; +import org.junit.Test; + +public class JavaKeyedStateTimeoutSuite { + + @Test + public void testTimeouts() { + assert(KeyedStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$); + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 200c39f43a..08216e2660 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -144,14 +144,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite { assertSupportedInBatchPlan( s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation)) + null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, null, + batchRelation)) assertSupportedInBatchPlan( s"flatMapGroupsWithState - multiple flatMapGroupsWithState($funcMode)s on batch relation", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, + null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, null, FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation))) + null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, + null, batchRelation))) } // FlatMapGroupsWithState(Update) in streaming without aggregation @@ -159,14 +161,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "flatMapGroupsWithState - flatMapGroupsWithState(Update) " + "on streaming relation without aggregation in update mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, + streamRelation), outputMode = Update) assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Update) " + "on streaming relation without aggregation in append mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, + streamRelation), outputMode = Append, expectedMsgs = Seq("flatMapGroupsWithState in update mode", "Append")) @@ -174,7 +178,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "flatMapGroupsWithState - flatMapGroupsWithState(Update) " + "on streaming relation without aggregation in complete mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, + streamRelation), outputMode = Complete, // Disallowed by the aggregation check but let's still keep this test in case it's broken in // future. @@ -186,7 +191,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + s"with aggregation in $outputMode mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), outputMode = outputMode, expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation")) @@ -197,14 +202,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + "on streaming relation without aggregation in append mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation), + null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, + streamRelation), outputMode = Append) assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + "on streaming relation without aggregation in update mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation), + null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, + streamRelation), outputMode = Update, expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update")) @@ -217,7 +224,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { Seq(attributeWithWatermark), aggExprs("c"), FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)), + null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, + streamRelation)), outputMode = outputMode) } @@ -225,7 +233,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + s"on streaming relation after aggregation in $outputMode mode", - FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), outputMode = outputMode, expectedMsgs = Seq("flatMapGroupsWithState", "after aggregation")) @@ -235,7 +244,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "flatMapGroupsWithState - " + "flatMapGroupsWithState(Update) on streaming relation in complete mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation), + null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, + streamRelation), outputMode = Complete, // Disallowed by the aggregation check but let's still keep this test in case it's broken in // future. @@ -248,7 +258,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation inside " + s"streaming relation in $outputMode output mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation), + null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, + null, batchRelation), outputMode = outputMode ) } @@ -258,19 +269,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite { assertSupportedInStreamingPlan( "flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " + "in append mode", - FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, - FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)), + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, streamRelation)), outputMode = Append) assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" + " are not in append mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)), + null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, + streamRelation)), outputMode = Append, expectedMsgs = Seq("multiple flatMapGroupsWithState", "append")) @@ -279,8 +291,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "mapGroupsWithState - mapGroupsWithState " + "on streaming relation without aggregation in append mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, - isMapGroupsWithState = true), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null, + streamRelation), outputMode = Append, // Disallowed by the aggregation check but let's still keep this test in case it's broken in // future. @@ -290,8 +302,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "mapGroupsWithState - mapGroupsWithState " + "on streaming relation without aggregation in complete mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, - isMapGroupsWithState = true), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null, + streamRelation), outputMode = Complete, // Disallowed by the aggregation check but let's still keep this test in case it's broken in // future. @@ -301,10 +313,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite { assertNotSupportedInStreamingPlan( "mapGroupsWithState - mapGroupsWithState on streaming relation " + s"with aggregation in $outputMode mode", - FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, - Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation), - isMapGroupsWithState = true), + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Update, + isMapGroupsWithState = true, null, + Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), outputMode = outputMode, expectedMsgs = Seq("mapGroupsWithState", "with aggregation")) } @@ -314,11 +325,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are " + "in append mode", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null, FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, - isMapGroupsWithState = true), - isMapGroupsWithState = true), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null, + streamRelation)), outputMode = Append, expectedMsgs = Seq("multiple mapGroupsWithStates")) @@ -327,11 +337,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "mapGroupsWithState - " + "mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation", FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null, FlatMapGroupsWithState( - null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation, - isMapGroupsWithState = false), - isMapGroupsWithState = true), + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, + streamRelation) + ), outputMode = Append, expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates")) |