aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2780d5b6ad..6f6b449acc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -192,7 +192,9 @@ class CheckpointWriter(
+ "'")
// Write checkpoint to temp file
- fs.delete(tempFile, true) // just in case it exists
+ if (fs.exists(tempFile)) {
+ fs.delete(tempFile, true) // just in case it exists
+ }
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
@@ -203,7 +205,9 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
- fs.delete(backupFile, true) // just in case it exists
+ if (fs.exists(backupFile)){
+ fs.delete(backupFile, true) // just in case it exists
+ }
if (!fs.rename(checkpointFile, backupFile)) {
logWarning("Could not rename " + checkpointFile + " to " + backupFile)
}