diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index d708486d8e..635bb86607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -37,13 +38,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( storeVersion: Long, keySchema: StructType, valueSchema: StructType, - storeConf: StateStoreConf, + sessionState: SessionState, @transient private val storeCoordinator: Option[StateStoreCoordinatorRef]) extends RDD[U](dataRDD) { + private val storeConf = new StateStoreConf(sessionState.conf) + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = dataRDD.context.broadcast( - new SerializableConfiguration(dataRDD.context.hadoopConfiguration)) + new SerializableConfiguration(sessionState.hadoopConf)) override protected def getPartitions: Array[Partition] = dataRDD.partitions |