aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorHao Zhu <viadeazhu@gmail.com>2015-08-10 17:17:22 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-10 17:17:22 -0700
commit3c9802d9400bea802984456683b2736a450ee17e (patch)
tree191947fb299a5b4512b21169cf20387f57801b4f /streaming
parent853809e948e7c5092643587a30738115b6591a59 (diff)
downloadspark-3c9802d9400bea802984456683b2736a450ee17e.tar.gz
spark-3c9802d9400bea802984456683b2736a450ee17e.tar.bz2
spark-3c9802d9400bea802984456683b2736a450ee17e.zip
[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Spark streaming deletes the temp file and backup files without checking if they exist or not Author: Hao Zhu <viadeazhu@gmail.com> Closes #8082 from viadea/master and squashes the following commits: 242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. 087daf0 [Hao Zhu] SPARK-9801
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2780d5b6ad..6f6b449acc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -192,7 +192,9 @@ class CheckpointWriter(
+ "'")
// Write checkpoint to temp file
- fs.delete(tempFile, true) // just in case it exists
+ if (fs.exists(tempFile)) {
+ fs.delete(tempFile, true) // just in case it exists
+ }
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
@@ -203,7 +205,9 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
- fs.delete(backupFile, true) // just in case it exists
+ if (fs.exists(backupFile)){
+ fs.delete(backupFile, true) // just in case it exists
+ }
if (!fs.rename(checkpointFile, backupFile)) {
logWarning("Could not rename " + checkpointFile + " to " + backupFile)
}