aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index d708486d8e..635bb86607 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -37,13 +38,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
- storeConf: StateStoreConf,
+ sessionState: SessionState,
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef])
extends RDD[U](dataRDD) {
+ private val storeConf = new StateStoreConf(sessionState.conf)
+
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = dataRDD.context.broadcast(
- new SerializableConfiguration(dataRDD.context.hadoopConfiguration))
+ new SerializableConfiguration(sessionState.hadoopConf))
override protected def getPartitions: Array[Partition] = dataRDD.partitions