diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 17 |
1 files changed, 5 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 0395600954..7d8b8679c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -184,8 +184,7 @@ class CheckpointWriter( val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) private var stopped = false - private var _fs: FileSystem = _ - + @volatile private[this] var fs: FileSystem = null @volatile private var latestCheckpointTime: Time = null class CheckpointWriteHandler( @@ -196,6 +195,9 @@ class CheckpointWriter( if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { latestCheckpointTime = checkpointTime } + if (fs == null) { + fs = new Path(checkpointDir).getFileSystem(hadoopConf) + } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") @@ -263,7 +265,7 @@ class CheckpointWriter( case ioe: IOException => logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe) - reset() + fs = null } } logWarning("Could not write checkpoint for time " + checkpointTime + " to file " @@ -297,15 +299,6 @@ class CheckpointWriter( ", waited for " + (endTime - startTime) + " ms.") stopped = true } - - private def fs = synchronized { - if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf) - _fs - } - - private def reset() = synchronized { - _fs = null - } } |