aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala23
1 files changed, 8 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index 3318660895..d708486d8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -22,12 +22,12 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.SerializableConfiguration
/**
* An RDD that allows computations to be executed against [[StateStore]]s. It
- * uses the [[StateStoreCoordinator]] to use the locations of loaded state stores as
- * preferred locations.
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores
+ * and use that as the preferred locations.
*/
class StateStoreRDD[T: ClassTag, U: ClassTag](
dataRDD: RDD[T],
@@ -54,17 +54,10 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
var store: StateStore = null
-
- Utils.tryWithSafeFinally {
- val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
- store = StateStore.get(
- storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value)
- val inputIter = dataRDD.iterator(partition, ctxt)
- val outputIter = storeUpdateFunction(store, inputIter)
- assert(store.hasCommitted)
- outputIter
- } {
- if (store != null) store.cancel()
- }
+ val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
+ store = StateStore.get(
+ storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value)
+ val inputIter = dataRDD.iterator(partition, ctxt)
+ storeUpdateFunction(store, inputIter)
}
}