diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala index 9b6d0918e2..4914a9d722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType package object state { @@ -43,7 +44,7 @@ package object state { storeVersion, keySchema, valueSchema, - new StateStoreConf(sqlContext.conf), + sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator))( storeUpdateFunction) } @@ -55,7 +56,7 @@ package object state { storeVersion: Long, keySchema: StructType, valueSchema: StructType, - storeConf: StateStoreConf, + sessionState: SessionState, storeCoordinator: Option[StateStoreCoordinatorRef])( storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = { val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction) @@ -67,7 +68,7 @@ package object state { storeVersion, keySchema, valueSchema, - storeConf, + sessionState, storeCoordinator) } } |