aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index 9b6d0918e2..4914a9d722 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.StructType
package object state {
@@ -43,7 +44,7 @@ package object state {
storeVersion,
keySchema,
valueSchema,
- new StateStoreConf(sqlContext.conf),
+ sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator))(
storeUpdateFunction)
}
@@ -55,7 +56,7 @@ package object state {
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
- storeConf: StateStoreConf,
+ sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef])(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
@@ -67,7 +68,7 @@ package object state {
storeVersion,
keySchema,
valueSchema,
- storeConf,
+ sessionState,
storeCoordinator)
}
}