diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index de4305f564..d5e4dd8f78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -63,7 +63,7 @@ case class StateStoreRestoreExec( storeVersion = getStateId.batchId, keyExpressions.toStructType, child.output.toStructType, - new StateStoreConf(sqlContext.conf), + sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) iter.flatMap { row => @@ -92,7 +92,7 @@ case class StateStoreSaveExec( storeVersion = getStateId.batchId, keyExpressions.toStructType, child.output.toStructType, - new StateStoreConf(sqlContext.conf), + sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => new Iterator[InternalRow] { private[this] val baseIterator = iter |