aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-31 10:58:43 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-31 10:58:43 -0700
commit567a50acfb0ae26bd430c290348886d494963696 (patch)
treeb2a7be7b5c14cd5dd82cac3066881cd9069d28b5 /sql/core/src/main
parentb2349e6a00d569851f0ca91a60e9299306208e92 (diff)
downloadspark-567a50acfb0ae26bd430c290348886d494963696.tar.gz
spark-567a50acfb0ae26bd430c290348886d494963696.tar.bz2
spark-567a50acfb0ae26bd430c290348886d494963696.zip
[SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec
## What changes were proposed in this pull request? - Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail. - Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data. - Added`testQuietly` on some tests that generate a lot of error logs. ## How was this patch tested? Multiple runs on existing unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17488 from tdas/SPARK-20165.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala28
1 files changed, 16 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index c7262ea972..e42df5dd61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
+ // Get the serializer for the state, taking into account whether we need to save timestamps
+ private val stateSerializer = {
+ val encoderSerializer = stateEncoder.namedExpressions
+ if (isTimeoutEnabled) {
+ encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
+ } else {
+ encoderSerializer
+ }
+ }
+ // Get the deserializer for the state. Note that this must be done in the driver, as
+ // resolving and binding of deserializer expressions to the encoded type can be safely done
+ // only in the driver.
+ private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
+
/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
@@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
- // Converter for translating state rows to Java objects
+ // Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
- stateEncoder.resolveAndBind().deserializer, stateAttributes)
-
- // Converter for translating state Java objects to rows
- private val stateSerializer = {
- val encoderSerializer = stateEncoder.namedExpressions
- if (isTimeoutEnabled) {
- encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
- } else {
- encoderSerializer
- }
- }
+ stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
// Index of the additional metadata fields in the state row