From 567a50acfb0ae26bd430c290348886d494963696 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 31 Mar 2017 10:58:43 -0700 Subject: [SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec ## What changes were proposed in this pull request? - Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail. - Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data. - Added`testQuietly` on some tests that generate a lot of error logs. ## How was this patch tested? Multiple runs on existing unit tests Author: Tathagata Das Closes #17488 from tdas/SPARK-20165. --- .../streaming/FlatMapGroupsWithStateExec.scala | 28 ++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) (limited to 'sql/core/src/main') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index c7262ea972..e42df5dd61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec( val encSchemaAttribs = stateEncoder.schema.toAttributes if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs } + // Get the serializer for the state, taking into account whether we need to save timestamps + private val stateSerializer = { + val encoderSerializer = stateEncoder.namedExpressions + if (isTimeoutEnabled) { + encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) + } else { + encoderSerializer + } + } + // Get the deserializer for the state. Note that this must be done in the driver, as + // resolving and binding of deserializer expressions to the encoded type can be safely done + // only in the driver. + private val stateDeserializer = stateEncoder.resolveAndBind().deserializer + /** Distribute by grouping attributes */ override def requiredChildDistribution: Seq[Distribution] = @@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec( ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) - // Converter for translating state rows to Java objects + // Converters for translating state between rows and Java objects private val getStateObjFromRow = ObjectOperator.deserializeRowToObject( - stateEncoder.resolveAndBind().deserializer, stateAttributes) - - // Converter for translating state Java objects to rows - private val stateSerializer = { - val encoderSerializer = stateEncoder.namedExpressions - if (isTimeoutEnabled) { - encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) - } else { - encoderSerializer - } - } + stateDeserializer, stateAttributes) private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer) // Index of the additional metadata fields in the state row -- cgit v1.2.3