diff options
author | guifeng <guifengleaf@gmail.com> | 2017-03-02 21:19:29 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-02 21:19:29 -0800 |
commit | e24f21b5f8365ed25346e986748b393e0b4be25c (patch) | |
tree | 800ceebcd56c5b1b24896685f837c12c7447bacf /sql/core/src/main/scala | |
parent | f37bb143022ea10107877c80c5c73bd77aeda7ff (diff) | |
download | spark-e24f21b5f8365ed25346e986748b393e0b4be25c.tar.gz spark-e24f21b5f8365ed25346e986748b393e0b4be25c.tar.bz2 spark-e24f21b5f8365ed25346e986748b393e0b4be25c.zip |
[SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
## What changes were proposed in this pull request?
[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)
The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future.
## How was this patch tested?
unit tests
Author: guifeng <guifengleaf@gmail.com>
Closes #17124 from gf53520/SPARK-19779.
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 2d29940eb8..ab1204a750 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider( // semantically correct because Structured Streaming requires rerunning a batch should // generate the same output. (SPARK-19677) // scalastyle:on - if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { + if (fs.exists(finalDeltaFile)) { + fs.delete(tempDeltaFile, true) + } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") } loadedMaps.put(newVersion, map) |