aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala32
1 files changed, 17 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755fd3..bec966b15e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.streaming.state
-import java.io.{DataInputStream, DataOutputStream, IOException}
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
- if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
+ if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
}
logInfo("Aborted")
@@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider(
/** Initialize the store provider */
private def initialize(): Unit = {
- if (!fs.exists(baseDir)) {
+ try {
fs.mkdirs(baseDir)
- } else {
- if (!fs.isDirectory(baseDir)) {
+ } catch {
+ case e: IOException =>
throw new IllegalStateException(
- s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
- s"$baseDir already exists and is not a directory")
- }
+ s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
}
}
@@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider(
private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
val fileToRead = deltaFile(version)
- if (!fs.exists(fileToRead)) {
- throw new IllegalStateException(
- s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
- }
var input: DataInputStream = null
+ val sourceStream = try {
+ fs.open(fileToRead)
+ } catch {
+ case f: FileNotFoundException =>
+ throw new IllegalStateException(
+ s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
+ }
try {
- input = decompressStream(fs.open(fileToRead))
+ input = decompressStream(sourceStream)
var eof = false
while(!eof) {
@@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider(
private def readSnapshotFile(version: Long): Option[MapType] = {
val fileToRead = snapshotFile(version)
- if (!fs.exists(fileToRead)) return None
-
val map = new MapType()
var input: DataInputStream = null
@@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider(
}
logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
Some(map)
+ } catch {
+ case _: FileNotFoundException =>
+ None
} finally {
if (input != null) input.close()
}