aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala27
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala24
3 files changed, 38 insertions, 30 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 398fa6500f..5cbad8bf3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -117,7 +117,7 @@ object Checkpoint extends Logging {
val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
- if (fs.exists(path)) {
+ try {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
@@ -127,9 +127,10 @@ object Checkpoint extends Logging {
logWarning(s"Listing $path returned null")
Seq.empty
}
- } else {
- logWarning(s"Checkpoint directory $path does not exist")
- Seq.empty
+ } catch {
+ case _: FileNotFoundException =>
+ logWarning(s"Checkpoint directory $path does not exist")
+ Seq.empty
}
}
@@ -229,9 +230,7 @@ class CheckpointWriter(
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
- if (fs.exists(tempFile)) {
- fs.delete(tempFile, true) // just in case it exists
- }
+ fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
@@ -242,9 +241,7 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true) // just in case it exists
- }
+ fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning(s"Could not rename $checkpointFile to $backupFile")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9b689f01b8..845f554308 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.streaming.util
+import java.io.FileNotFoundException
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.RejectedExecutionException
@@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) &&
- fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
- val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
- pastLogs.clear()
- pastLogs ++= logFileInfo
- logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
- logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ try {
+ // If you call listStatus(file) it returns a stat of the file in the array,
+ // rather than an array listing all the children.
+ // This makes it hard to differentiate listStatus(file) and
+ // listStatus(dir-with-one-child) except by examining the name of the returned status,
+ // and once you've got symlinks in the mix that differentiation isn't easy.
+ // Checking for the path being a directory is one more call to the filesystem, but
+ // leads to much clearer code.
+ if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
+ val logFileInfo = logFilesTologInfo(
+ fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ } catch {
+ case _: FileNotFoundException =>
+ // there is no log directory, hence nothing to recover
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 13a765d035..6a3b3200dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -44,18 +44,16 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
- if (dfs.isFile(dfsPath)) {
- try {
- dfs.open(dfsPath)
- } catch {
- case e: IOException =>
- // If we are really unlucky, the file may be deleted as we're opening the stream.
- // This can happen as clean up is performed by daemon threads that may be left over from
- // previous runs.
- if (!dfs.isFile(dfsPath)) null else throw e
- }
- } else {
- null
+ try {
+ dfs.open(dfsPath)
+ } catch {
+ case _: FileNotFoundException =>
+ null
+ case e: IOException =>
+ // If we are really unlucky, the file may be deleted as we're opening the stream.
+ // This can happen as clean up is performed by daemon threads that may be left over from
+ // previous runs.
+ if (!dfs.isFile(dfsPath)) null else throw e
}
}