diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-31 10:58:43 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-31 10:58:43 -0700 |
commit | 567a50acfb0ae26bd430c290348886d494963696 (patch) | |
tree | b2a7be7b5c14cd5dd82cac3066881cd9069d28b5 /sql/core/src/main | |
parent | b2349e6a00d569851f0ca91a60e9299306208e92 (diff) | |
download | spark-567a50acfb0ae26bd430c290348886d494963696.tar.gz spark-567a50acfb0ae26bd430c290348886d494963696.tar.bz2 spark-567a50acfb0ae26bd430c290348886d494963696.zip |
[SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec
## What changes were proposed in this pull request?
- Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail.
- Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data.
- Added`testQuietly` on some tests that generate a lot of error logs.
## How was this patch tested?
Multiple runs on existing unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #17488 from tdas/SPARK-20165.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index c7262ea972..e42df5dd61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec( val encSchemaAttribs = stateEncoder.schema.toAttributes if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs } + // Get the serializer for the state, taking into account whether we need to save timestamps + private val stateSerializer = { + val encoderSerializer = stateEncoder.namedExpressions + if (isTimeoutEnabled) { + encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) + } else { + encoderSerializer + } + } + // Get the deserializer for the state. Note that this must be done in the driver, as + // resolving and binding of deserializer expressions to the encoded type can be safely done + // only in the driver. + private val stateDeserializer = stateEncoder.resolveAndBind().deserializer + /** Distribute by grouping attributes */ override def requiredChildDistribution: Seq[Distribution] = @@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec( ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) - // Converter for translating state rows to Java objects + // Converters for translating state between rows and Java objects private val getStateObjFromRow = ObjectOperator.deserializeRowToObject( - stateEncoder.resolveAndBind().deserializer, stateAttributes) - - // Converter for translating state Java objects to rows - private val stateSerializer = { - val encoderSerializer = stateEncoder.namedExpressions - if (isTimeoutEnabled) { - encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) - } else { - encoderSerializer - } - } + stateDeserializer, stateAttributes) private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer) // Index of the additional metadata fields in the state row |