aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala28
1 files changed, 13 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index ca5c864d9e..9521506325 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.streaming.state
-import java.util.Timer
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
@@ -47,12 +46,11 @@ trait StateStore {
/** Version of the data in this store before committing updates. */
def version: Long
- /**
- * Update the value of a key using the value generated by the update function.
- * @note Do not mutate the retrieved value row as it will unexpectedly affect the previous
- * versions of the store data.
- */
- def update(key: UnsafeRow, updateFunc: Option[UnsafeRow] => UnsafeRow): Unit
+ /** Get the current value of a key. */
+ def get(key: UnsafeRow): Option[UnsafeRow]
+
+ /** Put a new value for a key. */
+ def put(key: UnsafeRow, value: UnsafeRow): Unit
/**
* Remove keys that match the following condition.
@@ -64,25 +62,25 @@ trait StateStore {
*/
def commit(): Long
- /** Cancel all the updates that have been made to the store. */
- def cancel(): Unit
+ /** Abort all the updates that have been made to the store. */
+ def abort(): Unit
/**
* Iterator of store data after a set of updates have been committed.
- * This can be called only after commitUpdates() has been called in the current thread.
+ * This can be called only after committing all the updates made in the current thread.
*/
def iterator(): Iterator[(UnsafeRow, UnsafeRow)]
/**
* Iterator of the updates that have been committed.
- * This can be called only after commitUpdates() has been called in the current thread.
+ * This can be called only after committing all the updates made in the current thread.
*/
def updates(): Iterator[StoreUpdate]
/**
* Whether all updates have been committed
*/
- def hasCommitted: Boolean
+ private[state] def hasCommitted: Boolean
}
@@ -110,8 +108,8 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
/**
* Companion object to [[StateStore]] that provides helper methods to create and retrieve stores
* by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null),
- * it also runs a periodic background tasks to do maintenance on the loaded stores. For each
- * store, tt uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
+ * it also runs a periodic background task to do maintenance on the loaded stores. For each
+ * store, it uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
* the store is the active instance. Accordingly, it either keeps it loaded and performs
* maintenance, or unloads the store.
*/
@@ -221,7 +219,7 @@ private[state] object StateStore extends Logging {
val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
val verified =
coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false)
- logDebug(s"Verifyied whether the loaded instance $storeId is active: $verified" )
+ logDebug(s"Verified whether the loaded instance $storeId is active: $verified" )
verified
} catch {
case NonFatal(e) =>