aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala40
1 files changed, 22 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 1e0a4a5d4f..3335755fd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -46,12 +46,14 @@ import org.apache.spark.util.Utils
* Usage:
* To update the data in the state store, the following order of operations are needed.
*
- * - val store = StateStore.get(operatorId, partitionId, version) // to get the right store
- * - store.update(...)
+ * // get the right store
+ * - val store = StateStore.get(
+ * StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...)
+ * - store.put(...)
* - store.remove(...)
- * - store.commit() // commits all the updates to made with version number
+ * - store.commit() // commits all the updates to made; the new version will be returned
* - store.iterator() // key-value data after last commit as an iterator
- * - store.updates() // updates made in the last as an iterator
+ * - store.updates() // updates made in the last commit as an iterator
*
* Fault-tolerance model:
* - Every set of updates is written to a delta file before committing.
@@ -99,7 +101,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
- verify(state == UPDATING, "Cannot remove after already committed or cancelled")
+ verify(state == UPDATING, "Cannot remove after already committed or aborted")
val isNewKey = !mapToUpdate.containsKey(key)
mapToUpdate.put(key, value)
@@ -109,7 +111,7 @@ private[state] class HDFSBackedStateStoreProvider(
// Value did not exist in previous version and was added already, keep it marked as added
allUpdates.put(key, ValueAdded(key, value))
case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) =>
- // Value existed in prev version and updated/removed, mark it as updated
+ // Value existed in previous version and updated/removed, mark it as updated
allUpdates.put(key, ValueUpdated(key, value))
case None =>
// There was no prior update, so mark this as added or updated according to its presence
@@ -122,7 +124,7 @@ private[state] class HDFSBackedStateStoreProvider(
/** Remove keys that match the following condition */
override def remove(condition: UnsafeRow => Boolean): Unit = {
- verify(state == UPDATING, "Cannot remove after already committed or cancelled")
+ verify(state == UPDATING, "Cannot remove after already committed or aborted")
val keyIter = mapToUpdate.keySet().iterator()
while (keyIter.hasNext) {
val key = keyIter.next
@@ -146,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider(
/** Commit all the updates that have been made to the store, and return the new version. */
override def commit(): Long = {
- verify(state == UPDATING, "Cannot commit after already committed or cancelled")
+ verify(state == UPDATING, "Cannot commit after already committed or aborted")
try {
finalizeDeltaFile(tempDeltaFileStream)
@@ -161,8 +163,10 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
- /** Cancel all the updates made on this store. This store will not be usable any more. */
+ /** Abort all the updates made on this store. This store will not be usable any more. */
override def abort(): Unit = {
+ verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
+
state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
@@ -170,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
fs.delete(tempDeltaFile, true)
}
- logInfo("Canceled ")
+ logInfo("Aborted")
}
/**
@@ -178,7 +182,8 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread.
*/
override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = {
- verify(state == COMMITTED, "Cannot get iterator of store data before committing")
+ verify(state == COMMITTED,
+ "Cannot get iterator of store data before committing or after aborting")
HDFSBackedStateStoreProvider.this.iterator(newVersion)
}
@@ -187,7 +192,8 @@ private[state] class HDFSBackedStateStoreProvider(
* This can be called only after committing all the updates made in the current thread.
*/
override def updates(): Iterator[StoreUpdate] = {
- verify(state == COMMITTED, "Cannot get iterator of updates before committing")
+ verify(state == COMMITTED,
+ "Cannot get iterator of updates before committing or after aborting")
allUpdates.values().asScala.toIterator
}
@@ -223,7 +229,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
override def toString(): String = {
- s"StateStore[id = (op=${id.operatorId},part=${id.partitionId}), dir = $baseDir]"
+ s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
}
/* Internal classes and methods */
@@ -277,7 +283,7 @@ private[state] class HDFSBackedStateStoreProvider(
} else {
if (!fs.isDirectory(baseDir)) {
throw new IllegalStateException(
- s"Cannot use ${id.checkpointLocation} for storing state data for $this as" +
+ s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
s"$baseDir already exists and is not a directory")
}
}
@@ -453,11 +459,11 @@ private[state] class HDFSBackedStateStoreProvider(
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
synchronized { loadedMaps.get(lastVersion) } match {
case Some(map) =>
- if (deltaFilesForLastVersion.size > storeConf.maxDeltasForSnapshot) {
+ if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
writeSnapshotFile(lastVersion, map)
}
case None =>
- // The last map is not loaded, probably some other instance is incharge
+ // The last map is not loaded, probably some other instance is in charge
}
}
@@ -506,7 +512,6 @@ private[state] class HDFSBackedStateStoreProvider(
.lastOption
val deltaBatchFiles = latestSnapshotFileBeforeVersion match {
case Some(snapshotFile) =>
- val deltaBatchIds = (snapshotFile.version + 1) to version
val deltaFiles = allFiles.filter { file =>
file.version > snapshotFile.version && file.version <= version
@@ -579,4 +584,3 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
}
-