aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java42
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala30
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java29
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala80
5 files changed, 136 insertions, 47 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
new file mode 100644
index 0000000000..cf112f2e02
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.plans.logical.NoTimeout$;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
+
+/**
+ * Represents the type of timeouts possible for the Dataset operations
+ * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
+ * `KeyedState` for more details.
+ *
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class KeyedStateTimeout {
+
+ /** Timeout based on processing time. */
+ public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
+
+ /** No timeout */
+ public static KeyedStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 36bf3017d4..771ac28e51 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -951,7 +951,7 @@ case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
override def eval(input: InternalRow): Any = {
val result = child.eval(input)
if (result == null) {
- throw new RuntimeException(errMsg);
+ throw new RuntimeException(errMsg)
}
result
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 7f4462e583..d1f95faf2d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode }
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -353,6 +353,10 @@ case class MapGroups(
/** Internal class representing State */
trait LogicalKeyedState[S]
+/** Possible types of timeouts used in FlatMapGroupsWithState */
+case object NoTimeout extends KeyedStateTimeout
+case object ProcessingTimeTimeout extends KeyedStateTimeout
+
/** Factory for constructing new `MapGroupsWithState` nodes. */
object FlatMapGroupsWithState {
def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
@@ -361,7 +365,10 @@ object FlatMapGroupsWithState {
dataAttributes: Seq[Attribute],
outputMode: OutputMode,
isMapGroupsWithState: Boolean,
+ timeout: KeyedStateTimeout,
child: LogicalPlan): LogicalPlan = {
+ val encoder = encoderFor[S]
+
val mapped = new FlatMapGroupsWithState(
func,
UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes),
@@ -369,11 +376,11 @@ object FlatMapGroupsWithState {
groupingAttributes,
dataAttributes,
CatalystSerde.generateObjAttr[U],
- encoderFor[S].resolveAndBind().deserializer,
- encoderFor[S].namedExpressions,
+ encoder.asInstanceOf[ExpressionEncoder[Any]],
outputMode,
- child,
- isMapGroupsWithState)
+ isMapGroupsWithState,
+ timeout,
+ child)
CatalystSerde.serialize[U](mapped)
}
}
@@ -384,15 +391,16 @@ object FlatMapGroupsWithState {
* Func is invoked with an object representation of the grouping key an iterator containing the
* object representation of all the rows with that key.
*
+ * @param func function called on each group
* @param keyDeserializer used to extract the key object for each group.
* @param valueDeserializer used to extract the items in the iterator from an input row.
* @param groupingAttributes used to group the data
* @param dataAttributes used to read the data
* @param outputObjAttr used to define the output object
- * @param stateDeserializer used to deserialize state before calling `func`
- * @param stateSerializer used to serialize updated state after calling `func`
+ * @param stateEncoder used to serialize/deserialize state before calling `func`
* @param outputMode the output mode of `func`
* @param isMapGroupsWithState whether it is created by the `mapGroupsWithState` method
+ * @param timeout used to timeout groups that have not received data in a while
*/
case class FlatMapGroupsWithState(
func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
@@ -401,11 +409,11 @@ case class FlatMapGroupsWithState(
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
outputObjAttr: Attribute,
- stateDeserializer: Expression,
- stateSerializer: Seq[NamedExpression],
+ stateEncoder: ExpressionEncoder[Any],
outputMode: OutputMode,
- child: LogicalPlan,
- isMapGroupsWithState: Boolean = false) extends UnaryNode with ObjectProducer {
+ isMapGroupsWithState: Boolean = false,
+ timeout: KeyedStateTimeout,
+ child: LogicalPlan) extends UnaryNode with ObjectProducer {
if (isMapGroupsWithState) {
assert(outputMode == OutputMode.Update)
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
new file mode 100644
index 0000000000..02c94b0b32
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
+import org.junit.Test;
+
+public class JavaKeyedStateTimeoutSuite {
+
+ @Test
+ public void testTimeouts() {
+ assert(KeyedStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 200c39f43a..08216e2660 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -144,14 +144,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertSupportedInBatchPlan(
s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation))
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, null,
+ batchRelation))
assertSupportedInBatchPlan(
s"flatMapGroupsWithState - multiple flatMapGroupsWithState($funcMode)s on batch relation",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode,
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation)))
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false,
+ null, batchRelation)))
}
// FlatMapGroupsWithState(Update) in streaming without aggregation
@@ -159,14 +161,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
"on streaming relation without aggregation in update mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Update)
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
"on streaming relation without aggregation in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Append,
expectedMsgs = Seq("flatMapGroupsWithState in update mode", "Append"))
@@ -174,7 +178,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Update) " +
"on streaming relation without aggregation in complete mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Complete,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -186,7 +191,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " +
s"with aggregation in $outputMode mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation"))
@@ -197,14 +202,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
"on streaming relation without aggregation in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Append)
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
"on streaming relation without aggregation in update mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Update,
expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update"))
@@ -217,7 +224,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
Seq(attributeWithWatermark),
aggExprs("c"),
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation)),
outputMode = outputMode)
}
@@ -225,7 +233,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
s"on streaming relation after aggregation in $outputMode mode",
- FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append,
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
+ isMapGroupsWithState = false, null,
Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
expectedMsgs = Seq("flatMapGroupsWithState", "after aggregation"))
@@ -235,7 +244,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"flatMapGroupsWithState - " +
"flatMapGroupsWithState(Update) on streaming relation in complete mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation),
outputMode = Complete,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -248,7 +258,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
s"flatMapGroupsWithState - flatMapGroupsWithState($funcMode) on batch relation inside " +
s"streaming relation in $outputMode output mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), funcMode, batchRelation),
+ null, att, att, Seq(att), Seq(att), att, null, funcMode, isMapGroupsWithState = false,
+ null, batchRelation),
outputMode = outputMode
)
}
@@ -258,19 +269,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " +
"in append mode",
- FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append,
- FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
+ isMapGroupsWithState = false, null,
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
+ isMapGroupsWithState = false, null, streamRelation)),
outputMode = Append)
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" +
" are not in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Append, streamRelation)),
+ null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
+ streamRelation)),
outputMode = Append,
expectedMsgs = Seq("multiple flatMapGroupsWithState", "append"))
@@ -279,8 +291,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - mapGroupsWithState " +
"on streaming relation without aggregation in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
+ streamRelation),
outputMode = Append,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -290,8 +302,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - mapGroupsWithState " +
"on streaming relation without aggregation in complete mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
+ streamRelation),
outputMode = Complete,
// Disallowed by the aggregation check but let's still keep this test in case it's broken in
// future.
@@ -301,10 +313,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertNotSupportedInStreamingPlan(
"mapGroupsWithState - mapGroupsWithState on streaming relation " +
s"with aggregation in $outputMode mode",
- FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
- Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation),
- isMapGroupsWithState = true),
+ FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Update,
+ isMapGroupsWithState = true, null,
+ Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
expectedMsgs = Seq("mapGroupsWithState", "with aggregation"))
}
@@ -314,11 +325,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are " +
"in append mode",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = true),
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
+ streamRelation)),
outputMode = Append,
expectedMsgs = Seq("multiple mapGroupsWithStates"))
@@ -327,11 +337,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"mapGroupsWithState - " +
"mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation",
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update,
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = true, null,
FlatMapGroupsWithState(
- null, att, att, Seq(att), Seq(att), att, att, Seq(att), Update, streamRelation,
- isMapGroupsWithState = false),
- isMapGroupsWithState = true),
+ null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null,
+ streamRelation)
+ ),
outputMode = Append,
expectedMsgs = Seq("Mixing mapGroupsWithStates and flatMapGroupsWithStates"))