aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index de4305f564..d5e4dd8f78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -63,7 +63,7 @@ case class StateStoreRestoreExec(
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
- new StateStoreConf(sqlContext.conf),
+ sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
@@ -92,7 +92,7 @@ case class StateStoreSaveExec(
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
- new StateStoreConf(sqlContext.conf),
+ sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
new Iterator[InternalRow] {
private[this] val baseIterator = iter