diff options
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 3335755fd3..bec966b15e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.{DataInputStream, DataOutputStream, IOException} +import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider( if (tempDeltaFileStream != null) { tempDeltaFileStream.close() } - if (tempDeltaFile != null && fs.exists(tempDeltaFile)) { + if (tempDeltaFile != null) { fs.delete(tempDeltaFile, true) } logInfo("Aborted") @@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider( /** Initialize the store provider */ private def initialize(): Unit = { - if (!fs.exists(baseDir)) { + try { fs.mkdirs(baseDir) - } else { - if (!fs.isDirectory(baseDir)) { + } catch { + case e: IOException => throw new IllegalStateException( - s"Cannot use ${id.checkpointLocation} for storing state data for $this as " + - s"$baseDir already exists and is not a directory") - } + s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e) } } @@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider( private def updateFromDeltaFile(version: Long, map: MapType): Unit = { val fileToRead = deltaFile(version) - if (!fs.exists(fileToRead)) { - throw new IllegalStateException( - s"Error reading delta file $fileToRead of $this: $fileToRead does not exist") - } var input: DataInputStream = null + val sourceStream = try { + fs.open(fileToRead) + } catch { + case f: FileNotFoundException => + throw new IllegalStateException( + s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f) + } try { - input = decompressStream(fs.open(fileToRead)) + input = decompressStream(sourceStream) var eof = false while(!eof) { @@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider( private def readSnapshotFile(version: Long): Option[MapType] = { val fileToRead = snapshotFile(version) - if (!fs.exists(fileToRead)) return None - val map = new MapType() var input: DataInputStream = null @@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider( } logInfo(s"Read snapshot file for version $version of $this from $fileToRead") Some(map) + } catch { + case _: FileNotFoundException => + None } finally { if (input != null) input.close() } |