diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index ca5c864d9e..9521506325 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming.state -import java.util.Timer import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable @@ -47,12 +46,11 @@ trait StateStore { /** Version of the data in this store before committing updates. */ def version: Long - /** - * Update the value of a key using the value generated by the update function. - * @note Do not mutate the retrieved value row as it will unexpectedly affect the previous - * versions of the store data. - */ - def update(key: UnsafeRow, updateFunc: Option[UnsafeRow] => UnsafeRow): Unit + /** Get the current value of a key. */ + def get(key: UnsafeRow): Option[UnsafeRow] + + /** Put a new value for a key. */ + def put(key: UnsafeRow, value: UnsafeRow): Unit /** * Remove keys that match the following condition. @@ -64,25 +62,25 @@ trait StateStore { */ def commit(): Long - /** Cancel all the updates that have been made to the store. */ - def cancel(): Unit + /** Abort all the updates that have been made to the store. */ + def abort(): Unit /** * Iterator of store data after a set of updates have been committed. - * This can be called only after commitUpdates() has been called in the current thread. + * This can be called only after committing all the updates made in the current thread. */ def iterator(): Iterator[(UnsafeRow, UnsafeRow)] /** * Iterator of the updates that have been committed. - * This can be called only after commitUpdates() has been called in the current thread. + * This can be called only after committing all the updates made in the current thread. */ def updates(): Iterator[StoreUpdate] /** * Whether all updates have been committed */ - def hasCommitted: Boolean + private[state] def hasCommitted: Boolean } @@ -110,8 +108,8 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate /** * Companion object to [[StateStore]] that provides helper methods to create and retrieve stores * by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null), - * it also runs a periodic background tasks to do maintenance on the loaded stores. For each - * store, tt uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of + * it also runs a periodic background task to do maintenance on the loaded stores. For each + * store, it uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of * the store is the active instance. Accordingly, it either keeps it loaded and performs * maintenance, or unloads the store. */ @@ -221,7 +219,7 @@ private[state] object StateStore extends Logging { val executorId = SparkEnv.get.blockManager.blockManagerId.executorId val verified = coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false) - logDebug(s"Verifyied whether the loaded instance $storeId is active: $verified" ) + logDebug(s"Verified whether the loaded instance $storeId is active: $verified" ) verified } catch { case NonFatal(e) => |