aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala17
1 files changed, 5 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0395600954..7d8b8679c5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -184,8 +184,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
- private var _fs: FileSystem = _
-
+ @volatile private[this] var fs: FileSystem = null
@volatile private var latestCheckpointTime: Time = null
class CheckpointWriteHandler(
@@ -196,6 +195,9 @@ class CheckpointWriter(
if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
latestCheckpointTime = checkpointTime
}
+ if (fs == null) {
+ fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+ }
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
@@ -263,7 +265,7 @@ class CheckpointWriter(
case ioe: IOException =>
logWarning("Error in attempt " + attempts + " of writing checkpoint to "
+ checkpointFile, ioe)
- reset()
+ fs = null
}
}
logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
@@ -297,15 +299,6 @@ class CheckpointWriter(
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}
-
- private def fs = synchronized {
- if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
- _fs
- }
-
- private def reset() = synchronized {
- _fs = null
- }
}