diff options
author | Roberto Agostino Vitillo <ra.vitillo@gmail.com> | 2017-02-28 10:49:07 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-02-28 10:49:07 -0800 |
commit | 9734a928a75d29ea202e9f309f92ca4637d35671 (patch) | |
tree | 6074e39c3c117374bb2c0006ba56d6d8d700e162 /sql/core/src | |
parent | 7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd (diff) | |
download | spark-9734a928a75d29ea202e9f309f92ca4637d35671.tar.gz spark-9734a928a75d29ea202e9f309f92ca4637d35671.tar.bz2 spark-9734a928a75d29ea202e9f309f92ca4637d35671.zip |
[SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS
## What changes were proposed in this pull request?
HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies:
> Destination exists and is a file
> Renaming a file atop an existing file is specified as failing, raising an exception.
> - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
> - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.
This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.
## How was this patch tested?
This patch was tested by running `StateStoreSuite`.
Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>
Closes #17012 from vitillo/fix_rename.
Diffstat (limited to 'sql/core/src')
2 files changed, 34 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 61eb601a18..2d29940eb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -274,7 +274,16 @@ private[state] class HDFSBackedStateStoreProvider( private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { synchronized { val finalDeltaFile = deltaFile(newVersion) - if (!fs.rename(tempDeltaFile, finalDeltaFile)) { + + // scalastyle:off + // Renaming a file atop an existing one fails on HDFS + // (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html). + // Hence we should either skip the rename step or delete the target file. Because deleting the + // target file will break speculation, skipping the rename step is the only choice. It's still + // semantically correct because Structured Streaming requires rerunning a batch should + // generate the same output. (SPARK-19677) + // scalastyle:on + if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") } loadedMaps.put(newVersion, map) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 6b38b6a097..dc4e935f3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth assert(store1.commit() === 2) assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1)) assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1)) - - // Overwrite the version with other data - val store2 = provider.getStore(1) - put(store2, "c", 1) - assert(store2.commit() === 2) - assert(rowsToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1)) - assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1)) } test("snapshotting") { @@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth assert(getDataFromFiles(provider, 19) === Set("a" -> 19)) } + test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") { + val conf = new Configuration() + conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName) + conf.set("fs.default.name", "fake:///") + + val provider = newStoreProvider(hadoopConf = conf) + provider.getStore(0).commit() + provider.getStore(0).commit() + } test("corrupted file handling") { val provider = newStoreProvider(minDeltasForSnapshot = 5) @@ -682,6 +684,21 @@ private[state] object StateStoreSuite { } /** + * Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing + * one should return false. + * See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html + */ +class RenameLikeHDFSFileSystem extends RawLocalFileSystem { + override def rename(src: Path, dst: Path): Boolean = { + if (exists(dst)) { + return false + } else { + return super.rename(src, dst) + } + } +} + +/** * Fake FileSystem to test that the StateStore throws an exception while committing the * delta file, when `fs.rename` returns `false`. */ |