From 852bbc6c0046d194fef0b6d0b99162ea2cc10286 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 12 Apr 2016 11:50:51 -0700 Subject: [SPARK-14556][SQL] Code clean-ups for package o.a.s.sql.execution.streaming.state ## What changes were proposed in this pull request? - `StateStoreConf.**max**DeltasForSnapshot` was renamed to `StateStoreConf.**min**DeltasForSnapshot` - some state switch checks were added - improved consistency between method names and string literals - other comments & typo fix ## How was this patch tested? N/A Author: Liwei Lin Closes #12323 from lw-lin/streaming-state-clean-up. --- .../state/HDFSBackedStateStoreProvider.scala | 40 ++++++++++++---------- .../sql/execution/streaming/state/StateStore.scala | 7 ++-- .../execution/streaming/state/StateStoreConf.scala | 3 +- .../streaming/state/StateStoreCoordinator.scala | 7 ++-- .../execution/streaming/state/StateStoreRDD.scala | 6 ++-- 5 files changed, 31 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 1e0a4a5d4f..3335755fd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -46,12 +46,14 @@ import org.apache.spark.util.Utils * Usage: * To update the data in the state store, the following order of operations are needed. * - * - val store = StateStore.get(operatorId, partitionId, version) // to get the right store - * - store.update(...) + * // get the right store + * - val store = StateStore.get( + * StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...) + * - store.put(...) * - store.remove(...) - * - store.commit() // commits all the updates to made with version number + * - store.commit() // commits all the updates to made; the new version will be returned * - store.iterator() // key-value data after last commit as an iterator - * - store.updates() // updates made in the last as an iterator + * - store.updates() // updates made in the last commit as an iterator * * Fault-tolerance model: * - Every set of updates is written to a delta file before committing. @@ -99,7 +101,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def put(key: UnsafeRow, value: UnsafeRow): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or cancelled") + verify(state == UPDATING, "Cannot remove after already committed or aborted") val isNewKey = !mapToUpdate.containsKey(key) mapToUpdate.put(key, value) @@ -109,7 +111,7 @@ private[state] class HDFSBackedStateStoreProvider( // Value did not exist in previous version and was added already, keep it marked as added allUpdates.put(key, ValueAdded(key, value)) case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) => - // Value existed in prev version and updated/removed, mark it as updated + // Value existed in previous version and updated/removed, mark it as updated allUpdates.put(key, ValueUpdated(key, value)) case None => // There was no prior update, so mark this as added or updated according to its presence @@ -122,7 +124,7 @@ private[state] class HDFSBackedStateStoreProvider( /** Remove keys that match the following condition */ override def remove(condition: UnsafeRow => Boolean): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or cancelled") + verify(state == UPDATING, "Cannot remove after already committed or aborted") val keyIter = mapToUpdate.keySet().iterator() while (keyIter.hasNext) { val key = keyIter.next @@ -146,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider( /** Commit all the updates that have been made to the store, and return the new version. */ override def commit(): Long = { - verify(state == UPDATING, "Cannot commit after already committed or cancelled") + verify(state == UPDATING, "Cannot commit after already committed or aborted") try { finalizeDeltaFile(tempDeltaFileStream) @@ -161,8 +163,10 @@ private[state] class HDFSBackedStateStoreProvider( } } - /** Cancel all the updates made on this store. This store will not be usable any more. */ + /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { + verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + state = ABORTED if (tempDeltaFileStream != null) { tempDeltaFileStream.close() @@ -170,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider( if (tempDeltaFile != null && fs.exists(tempDeltaFile)) { fs.delete(tempDeltaFile, true) } - logInfo("Canceled ") + logInfo("Aborted") } /** @@ -178,7 +182,8 @@ private[state] class HDFSBackedStateStoreProvider( * This can be called only after committing all the updates made in the current thread. */ override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = { - verify(state == COMMITTED, "Cannot get iterator of store data before committing") + verify(state == COMMITTED, + "Cannot get iterator of store data before committing or after aborting") HDFSBackedStateStoreProvider.this.iterator(newVersion) } @@ -187,7 +192,8 @@ private[state] class HDFSBackedStateStoreProvider( * This can be called only after committing all the updates made in the current thread. */ override def updates(): Iterator[StoreUpdate] = { - verify(state == COMMITTED, "Cannot get iterator of updates before committing") + verify(state == COMMITTED, + "Cannot get iterator of updates before committing or after aborting") allUpdates.values().asScala.toIterator } @@ -223,7 +229,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def toString(): String = { - s"StateStore[id = (op=${id.operatorId},part=${id.partitionId}), dir = $baseDir]" + s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]" } /* Internal classes and methods */ @@ -277,7 +283,7 @@ private[state] class HDFSBackedStateStoreProvider( } else { if (!fs.isDirectory(baseDir)) { throw new IllegalStateException( - s"Cannot use ${id.checkpointLocation} for storing state data for $this as" + + s"Cannot use ${id.checkpointLocation} for storing state data for $this as " + s"$baseDir already exists and is not a directory") } } @@ -453,11 +459,11 @@ private[state] class HDFSBackedStateStoreProvider( filesForVersion(files, lastVersion).filter(_.isSnapshot == false) synchronized { loadedMaps.get(lastVersion) } match { case Some(map) => - if (deltaFilesForLastVersion.size > storeConf.maxDeltasForSnapshot) { + if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) { writeSnapshotFile(lastVersion, map) } case None => - // The last map is not loaded, probably some other instance is incharge + // The last map is not loaded, probably some other instance is in charge } } @@ -506,7 +512,6 @@ private[state] class HDFSBackedStateStoreProvider( .lastOption val deltaBatchFiles = latestSnapshotFileBeforeVersion match { case Some(snapshotFile) => - val deltaBatchIds = (snapshotFile.version + 1) to version val deltaFiles = allFiles.filter { file => file.version > snapshotFile.version && file.version <= version @@ -579,4 +584,3 @@ private[state] class HDFSBackedStateStoreProvider( } } } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 07f63f928b..cc5327e0e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming.state -import java.util.Timer import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable @@ -63,7 +62,7 @@ trait StateStore { */ def commit(): Long - /** Cancel all the updates that have been made to the store. */ + /** Abort all the updates that have been made to the store. */ def abort(): Unit /** @@ -109,8 +108,8 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate /** * Companion object to [[StateStore]] that provides helper methods to create and retrieve stores * by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null), - * it also runs a periodic background tasks to do maintenance on the loaded stores. For each - * store, tt uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of + * it also runs a periodic background task to do maintenance on the loaded stores. For each + * store, it uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of * the store is the active instance. Accordingly, it either keeps it loaded and performs * maintenance, or unloads the store. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index f0f1f3a1a8..e55f63a6c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -26,7 +26,7 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex import SQLConf._ - val maxDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN) } @@ -34,4 +34,3 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex private[streaming] object StateStoreConf { val empty = new StateStoreConf() } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala index 812e1b0a39..e418217238 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala @@ -50,8 +50,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging { private val endpointName = "StateStoreCoordinator" /** - * Create a reference to a [[StateStoreCoordinator]], This can be called from driver as well as - * executors. + * Create a reference to a [[StateStoreCoordinator]] */ def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized { try { @@ -75,7 +74,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging { } /** - * Reference to a [[StateStoreCoordinator]] that can be used to coordinator instances of + * Reference to a [[StateStoreCoordinator]] that can be used to coordinate instances of * [[StateStore]]s across all the executors, and get their locations for job scheduling. */ private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { @@ -142,5 +141,3 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS context.reply(true) } } - - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index df3d82c113..d708486d8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -22,12 +22,12 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.SerializableConfiguration /** * An RDD that allows computations to be executed against [[StateStore]]s. It - * uses the [[StateStoreCoordinator]] to use the locations of loaded state stores as - * preferred locations. + * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores + * and use that as the preferred locations. */ class StateStoreRDD[T: ClassTag, U: ClassTag]( dataRDD: RDD[T], -- cgit v1.2.3