aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala21
1 files changed, 10 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index b249e37921..9b6d0918e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -28,37 +28,36 @@ package object state {
implicit class StateStoreOps[T: ClassTag](dataRDD: RDD[T]) {
/** Map each partition of a RDD along with data in a [[StateStore]]. */
- def mapPartitionWithStateStore[U: ClassTag](
- storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U],
+ def mapPartitionsWithStateStore[U: ClassTag](
+ sqlContext: SQLContext,
checkpointLocation: String,
operatorId: Long,
storeVersion: Long,
keySchema: StructType,
- valueSchema: StructType
- )(implicit sqlContext: SQLContext): StateStoreRDD[T, U] = {
+ valueSchema: StructType)(
+ storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
- mapPartitionWithStateStore(
- storeUpdateFunction,
+ mapPartitionsWithStateStore(
checkpointLocation,
operatorId,
storeVersion,
keySchema,
valueSchema,
new StateStoreConf(sqlContext.conf),
- Some(sqlContext.streams.stateStoreCoordinator))
+ Some(sqlContext.streams.stateStoreCoordinator))(
+ storeUpdateFunction)
}
/** Map each partition of a RDD along with data in a [[StateStore]]. */
- private[state] def mapPartitionWithStateStore[U: ClassTag](
- storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U],
+ private[streaming] def mapPartitionsWithStateStore[U: ClassTag](
checkpointLocation: String,
operatorId: Long,
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
storeConf: StateStoreConf,
- storeCoordinator: Option[StateStoreCoordinatorRef]
- ): StateStoreRDD[T, U] = {
+ storeCoordinator: Option[StateStoreCoordinatorRef])(
+ storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
new StateStoreRDD(
dataRDD,