diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 1e0a4a5d4f..3335755fd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -46,12 +46,14 @@ import org.apache.spark.util.Utils * Usage: * To update the data in the state store, the following order of operations are needed. * - * - val store = StateStore.get(operatorId, partitionId, version) // to get the right store - * - store.update(...) + * // get the right store + * - val store = StateStore.get( + * StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...) + * - store.put(...) * - store.remove(...) - * - store.commit() // commits all the updates to made with version number + * - store.commit() // commits all the updates to made; the new version will be returned * - store.iterator() // key-value data after last commit as an iterator - * - store.updates() // updates made in the last as an iterator + * - store.updates() // updates made in the last commit as an iterator * * Fault-tolerance model: * - Every set of updates is written to a delta file before committing. @@ -99,7 +101,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def put(key: UnsafeRow, value: UnsafeRow): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or cancelled") + verify(state == UPDATING, "Cannot remove after already committed or aborted") val isNewKey = !mapToUpdate.containsKey(key) mapToUpdate.put(key, value) @@ -109,7 +111,7 @@ private[state] class HDFSBackedStateStoreProvider( // Value did not exist in previous version and was added already, keep it marked as added allUpdates.put(key, ValueAdded(key, value)) case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) => - // Value existed in prev version and updated/removed, mark it as updated + // Value existed in previous version and updated/removed, mark it as updated allUpdates.put(key, ValueUpdated(key, value)) case None => // There was no prior update, so mark this as added or updated according to its presence @@ -122,7 +124,7 @@ private[state] class HDFSBackedStateStoreProvider( /** Remove keys that match the following condition */ override def remove(condition: UnsafeRow => Boolean): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or cancelled") + verify(state == UPDATING, "Cannot remove after already committed or aborted") val keyIter = mapToUpdate.keySet().iterator() while (keyIter.hasNext) { val key = keyIter.next @@ -146,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider( /** Commit all the updates that have been made to the store, and return the new version. */ override def commit(): Long = { - verify(state == UPDATING, "Cannot commit after already committed or cancelled") + verify(state == UPDATING, "Cannot commit after already committed or aborted") try { finalizeDeltaFile(tempDeltaFileStream) @@ -161,8 +163,10 @@ private[state] class HDFSBackedStateStoreProvider( } } - /** Cancel all the updates made on this store. This store will not be usable any more. */ + /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { + verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + state = ABORTED if (tempDeltaFileStream != null) { tempDeltaFileStream.close() @@ -170,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider( if (tempDeltaFile != null && fs.exists(tempDeltaFile)) { fs.delete(tempDeltaFile, true) } - logInfo("Canceled ") + logInfo("Aborted") } /** @@ -178,7 +182,8 @@ private[state] class HDFSBackedStateStoreProvider( * This can be called only after committing all the updates made in the current thread. */ override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = { - verify(state == COMMITTED, "Cannot get iterator of store data before committing") + verify(state == COMMITTED, + "Cannot get iterator of store data before committing or after aborting") HDFSBackedStateStoreProvider.this.iterator(newVersion) } @@ -187,7 +192,8 @@ private[state] class HDFSBackedStateStoreProvider( * This can be called only after committing all the updates made in the current thread. */ override def updates(): Iterator[StoreUpdate] = { - verify(state == COMMITTED, "Cannot get iterator of updates before committing") + verify(state == COMMITTED, + "Cannot get iterator of updates before committing or after aborting") allUpdates.values().asScala.toIterator } @@ -223,7 +229,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def toString(): String = { - s"StateStore[id = (op=${id.operatorId},part=${id.partitionId}), dir = $baseDir]" + s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]" } /* Internal classes and methods */ @@ -277,7 +283,7 @@ private[state] class HDFSBackedStateStoreProvider( } else { if (!fs.isDirectory(baseDir)) { throw new IllegalStateException( - s"Cannot use ${id.checkpointLocation} for storing state data for $this as" + + s"Cannot use ${id.checkpointLocation} for storing state data for $this as " + s"$baseDir already exists and is not a directory") } } @@ -453,11 +459,11 @@ private[state] class HDFSBackedStateStoreProvider( filesForVersion(files, lastVersion).filter(_.isSnapshot == false) synchronized { loadedMaps.get(lastVersion) } match { case Some(map) => - if (deltaFilesForLastVersion.size > storeConf.maxDeltasForSnapshot) { + if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) { writeSnapshotFile(lastVersion, map) } case None => - // The last map is not loaded, probably some other instance is incharge + // The last map is not loaded, probably some other instance is in charge } } @@ -506,7 +512,6 @@ private[state] class HDFSBackedStateStoreProvider( .lastOption val deltaBatchFiles = latestSnapshotFileBeforeVersion match { case Some(snapshotFile) => - val deltaBatchIds = (snapshotFile.version + 1) to version val deltaFiles = allFiles.filter { file => file.version > snapshotFile.version && file.version <= version @@ -579,4 +584,3 @@ private[state] class HDFSBackedStateStoreProvider( } } } - |