diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala | 23 |
1 files changed, 8 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 3318660895..d708486d8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -22,12 +22,12 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.SerializableConfiguration /** * An RDD that allows computations to be executed against [[StateStore]]s. It - * uses the [[StateStoreCoordinator]] to use the locations of loaded state stores as - * preferred locations. + * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores + * and use that as the preferred locations. */ class StateStoreRDD[T: ClassTag, U: ClassTag]( dataRDD: RDD[T], @@ -54,17 +54,10 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { var store: StateStore = null - - Utils.tryWithSafeFinally { - val storeId = StateStoreId(checkpointLocation, operatorId, partition.index) - store = StateStore.get( - storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value) - val inputIter = dataRDD.iterator(partition, ctxt) - val outputIter = storeUpdateFunction(store, inputIter) - assert(store.hasCommitted) - outputIter - } { - if (store != null) store.cancel() - } + val storeId = StateStoreId(checkpointLocation, operatorId, partition.index) + store = StateStore.get( + storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value) + val inputIter = dataRDD.iterator(partition, ctxt) + storeUpdateFunction(store, inputIter) } } |