aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorRoberto Agostino Vitillo <ra.vitillo@gmail.com>2017-02-28 10:49:07 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-28 10:49:07 -0800
commit9734a928a75d29ea202e9f309f92ca4637d35671 (patch)
tree6074e39c3c117374bb2c0006ba56d6d8d700e162 /sql/core/src/test
parent7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd (diff)
downloadspark-9734a928a75d29ea202e9f309f92ca4637d35671.tar.gz
spark-9734a928a75d29ea202e9f309f92ca4637d35671.tar.bz2
spark-9734a928a75d29ea202e9f309f92ca4637d35671.zip
[SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS
## What changes were proposed in this pull request? HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies: > Destination exists and is a file > Renaming a file atop an existing file is specified as failing, raising an exception. > - Local FileSystem : the rename succeeds; the destination file is replaced by the source file. > - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false. This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output. ## How was this patch tested? This patch was tested by running `StateStoreSuite`. Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com> Closes #17012 from vitillo/fix_rename.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala31
1 files changed, 24 insertions, 7 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 6b38b6a097..dc4e935f3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
assert(store1.commit() === 2)
assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))
-
- // Overwrite the version with other data
- val store2 = provider.getStore(1)
- put(store2, "c", 1)
- assert(store2.commit() === 2)
- assert(rowsToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1))
- assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1))
}
test("snapshotting") {
@@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
}
+ test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
+ val conf = new Configuration()
+ conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
+ conf.set("fs.default.name", "fake:///")
+
+ val provider = newStoreProvider(hadoopConf = conf)
+ provider.getStore(0).commit()
+ provider.getStore(0).commit()
+ }
test("corrupted file handling") {
val provider = newStoreProvider(minDeltasForSnapshot = 5)
@@ -682,6 +684,21 @@ private[state] object StateStoreSuite {
}
/**
+ * Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing
+ * one should return false.
+ * See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
+ */
+class RenameLikeHDFSFileSystem extends RawLocalFileSystem {
+ override def rename(src: Path, dst: Path): Boolean = {
+ if (exists(dst)) {
+ return false
+ } else {
+ return super.rename(src, dst)
+ }
+ }
+}
+
+/**
* Fake FileSystem to test that the StateStore throws an exception while committing the
* delta file, when `fs.rename` returns `false`.
*/