aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-04-12 11:50:51 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-12 11:50:51 -0700
commit852bbc6c0046d194fef0b6d0b99162ea2cc10286 (patch)
tree2a849626b5a4aeb4b534a76fa00a9bb6eb5a64a8
parent111a62474a2fb7f4e7f19fcfb8efaae37aa40400 (diff)
downloadspark-852bbc6c0046d194fef0b6d0b99162ea2cc10286.tar.gz
spark-852bbc6c0046d194fef0b6d0b99162ea2cc10286.tar.bz2
spark-852bbc6c0046d194fef0b6d0b99162ea2cc10286.zip
[SPARK-14556][SQL] Code clean-ups for package o.a.s.sql.execution.streaming.state
## What changes were proposed in this pull request? - `StateStoreConf.**max**DeltasForSnapshot` was renamed to `StateStoreConf.**min**DeltasForSnapshot` - some state switch checks were added - improved consistency between method names and string literals - other comments & typo fix ## How was this patch tested? N/A Author: Liwei Lin <lwlin7@gmail.com> Closes #12323 from lw-lin/streaming-state-clean-up.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala6
5 files changed, 31 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 1e0a4a5d4f..3335755fd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -46,12 +46,14 @@ import org.apache.spark.util.Utils
* Usage:
* To update the data in the state store, the following order of operations are needed.
*
- * - val store = StateStore.get(operatorId, partitionId, version) // to get the right store
- * - store.update(...)
+ * // get the right store
+ * - val store = StateStore.get(
+ * StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...)
+ * - store.put(...)
* - store.remove(...)
- * - store.commit() // commits all the updates to made with version number
+ * - store.commit() // commits all the updates to made; the new version will be returned
* - store.iterator() // key-value data after last commit as an iterator
- * - store.updates() // updates made in the last as an iterator
+ * - store.updates() // updates made in the last commit as an iterator
*
* Fault-tolerance model:
* - Every set of updates is written to a delta file before committing.
@@ -99,7 +101,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
- verify(state == UPDATING, "Cannot remove after already committed or cancelled")
+ verify(state == UPDATING, "Cannot remove after already committed or aborted")
val isNewKey = !mapToUpdate.containsKey(key)
mapToUpdate.put(key, value)
@@ -109,7 +111,7 @@ private[state] class HDFSBackedStateStoreProvider(
// Value did not exist in previous version and was added already, keep it marked as added
allUpdates.put(key, ValueAdded(key, value))
case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) =>
- // Value existed in prev version and updated/removed, mark it as updated
+ // Value existed in previous version and updated/removed, mark it as updated
allUpdates.put(key, ValueUpdated(key, value))
case None =>
// There was no prior update, so mark this as added or updated according to its presence
@@ -122,7 +124,7 @@ private[state] class HDFSBackedStateStoreProvider(
/** Remove keys that match the following condition */
override def remove(condition: UnsafeRow => Boolean): Unit = {
- verify(state == UPDATING, "Cannot remove after already committed or cancelled")
+ verify(state == UPDATING, "Cannot remove after already committed or aborted")
val keyIter = mapToUpdate.keySet().iterator()
while (keyIter.hasNext) {
val key = keyIter.next
@@ -146,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider(
/** Commit all the updates that have been made to the store, and return the new version. */
override def commit(): Long = {
- verify(state == UPDATING, "Cannot commit after already committed or cancelled")
+ verify(state == UPDATING, "Cannot commit after already committed or aborted")
try {
finalizeDeltaFile(tempDeltaFileStream)
@@ -161,8 +163,10 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
- /** Cancel all the updates made on this store. This store will not be usable any more. */
+ /** Abort all the updates made on this store. This store will not be usable any more. */
override def abort(): Unit = {
+ verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
+
state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
@@ -170,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
fs.delete(tempDeltaFile, true)
}
- logInfo("Canceled ")
+ logInfo("Aborted")
}
/**
@@ -178,7 +182,8 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread.
*/
override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = {
- verify(state == COMMITTED, "Cannot get iterator of store data before committing")
+ verify(state == COMMITTED,
+ "Cannot get iterator of store data before committing or after aborting")
HDFSBackedStateStoreProvider.this.iterator(newVersion)
}
@@ -187,7 +192,8 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread.
*/
override def updates(): Iterator[StoreUpdate] = {
- verify(state == COMMITTED, "Cannot get iterator of updates before committing")
+ verify(state == COMMITTED,
+ "Cannot get iterator of updates before committing or after aborting")
allUpdates.values().asScala.toIterator
}
@@ -223,7 +229,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
override def toString(): String = {
- s"StateStore[id = (op=${id.operatorId},part=${id.partitionId}), dir = $baseDir]"
+ s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
}
/* Internal classes and methods */
@@ -277,7 +283,7 @@ private[state] class HDFSBackedStateStoreProvider(
} else {
if (!fs.isDirectory(baseDir)) {
throw new IllegalStateException(
- s"Cannot use ${id.checkpointLocation} for storing state data for $this as" +
+ s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
s"$baseDir already exists and is not a directory")
}
}
@@ -453,11 +459,11 @@ private[state] class HDFSBackedStateStoreProvider(
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
synchronized { loadedMaps.get(lastVersion) } match {
case Some(map) =>
- if (deltaFilesForLastVersion.size > storeConf.maxDeltasForSnapshot) {
+ if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
writeSnapshotFile(lastVersion, map)
}
case None =>
- // The last map is not loaded, probably some other instance is incharge
+ // The last map is not loaded, probably some other instance is in charge
}
}
@@ -506,7 +512,6 @@ private[state] class HDFSBackedStateStoreProvider(
.lastOption
val deltaBatchFiles = latestSnapshotFileBeforeVersion match {
case Some(snapshotFile) =>
- val deltaBatchIds = (snapshotFile.version + 1) to version
val deltaFiles = allFiles.filter { file =>
file.version > snapshotFile.version && file.version <= version
@@ -579,4 +584,3 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 07f63f928b..cc5327e0e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.streaming.state
-import java.util.Timer
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
@@ -63,7 +62,7 @@ trait StateStore {
*/
def commit(): Long
- /** Cancel all the updates that have been made to the store. */
+ /** Abort all the updates that have been made to the store. */
def abort(): Unit
/**
@@ -109,8 +108,8 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
/**
* Companion object to [[StateStore]] that provides helper methods to create and retrieve stores
* by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null),
- * it also runs a periodic background tasks to do maintenance on the loaded stores. For each
- * store, tt uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
+ * it also runs a periodic background task to do maintenance on the loaded stores. For each
+ * store, it uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
* the store is the active instance. Accordingly, it either keeps it loaded and performs
* maintenance, or unloads the store.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index f0f1f3a1a8..e55f63a6c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -26,7 +26,7 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
import SQLConf._
- val maxDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+ val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
}
@@ -34,4 +34,3 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
private[streaming] object StateStoreConf {
val empty = new StateStoreConf()
}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 812e1b0a39..e418217238 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -50,8 +50,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
private val endpointName = "StateStoreCoordinator"
/**
- * Create a reference to a [[StateStoreCoordinator]], This can be called from driver as well as
- * executors.
+ * Create a reference to a [[StateStoreCoordinator]]
*/
def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
try {
@@ -75,7 +74,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
}
/**
- * Reference to a [[StateStoreCoordinator]] that can be used to coordinator instances of
+ * Reference to a [[StateStoreCoordinator]] that can be used to coordinate instances of
* [[StateStore]]s across all the executors, and get their locations for job scheduling.
*/
private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
@@ -142,5 +141,3 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS
context.reply(true)
}
}
-
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index df3d82c113..d708486d8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -22,12 +22,12 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.SerializableConfiguration
/**
* An RDD that allows computations to be executed against [[StateStore]]s. It
- * uses the [[StateStoreCoordinator]] to use the locations of loaded state stores as
- * preferred locations.
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores
+ * and use that as the preferred locations.
*/
class StateStoreRDD[T: ClassTag, U: ClassTag](
dataRDD: RDD[T],