diff options
author | Hao Zhu <viadeazhu@gmail.com> | 2015-08-10 17:17:22 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-10 17:17:22 -0700 |
commit | 3c9802d9400bea802984456683b2736a450ee17e (patch) | |
tree | 191947fb299a5b4512b21169cf20387f57801b4f /streaming | |
parent | 853809e948e7c5092643587a30738115b6591a59 (diff) | |
download | spark-3c9802d9400bea802984456683b2736a450ee17e.tar.gz spark-3c9802d9400bea802984456683b2736a450ee17e.tar.bz2 spark-3c9802d9400bea802984456683b2736a450ee17e.zip |
[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Spark streaming deletes the temp file and backup files without checking if they exist or not
Author: Hao Zhu <viadeazhu@gmail.com>
Closes #8082 from viadea/master and squashes the following commits:
242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2780d5b6ad..6f6b449acc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -192,7 +192,9 @@ class CheckpointWriter( + "'") // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { + fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) Utils.tryWithSafeFinally { fos.write(bytes) @@ -203,7 +205,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { - fs.delete(backupFile, true) // just in case it exists + if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists + } if (!fs.rename(checkpointFile, backupFile)) { logWarning("Could not rename " + checkpointFile + " to " + backupFile) } |