aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala28
1 files changed, 16 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index c7262ea972..e42df5dd61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
+ // Get the serializer for the state, taking into account whether we need to save timestamps
+ private val stateSerializer = {
+ val encoderSerializer = stateEncoder.namedExpressions
+ if (isTimeoutEnabled) {
+ encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
+ } else {
+ encoderSerializer
+ }
+ }
+ // Get the deserializer for the state. Note that this must be done in the driver, as
+ // resolving and binding of deserializer expressions to the encoded type can be safely done
+ // only in the driver.
+ private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
+
/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
@@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
- // Converter for translating state rows to Java objects
+ // Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
- stateEncoder.resolveAndBind().deserializer, stateAttributes)
-
- // Converter for translating state Java objects to rows
- private val stateSerializer = {
- val encoderSerializer = stateEncoder.namedExpressions
- if (isTimeoutEnabled) {
- encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
- } else {
- encoderSerializer
- }
- }
+ stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
// Index of the additional metadata fields in the state row