aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorSteve Loughran <stevel@apache.org>2016-08-17 11:42:57 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-17 11:43:01 -0700
commitcc97ea188e1d5b8e851d1a8438b8af092783ec04 (patch)
tree6128ca95597fd4765608e986f501dafcbbccf464 /streaming/src
parent4d92af310ad29ade039e4130f91f2a3d9180deef (diff)
downloadspark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.tar.gz
spark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.tar.bz2
spark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.zip
[SPARK-16736][CORE][SQL] purge superfluous fs calls
A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous. 1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes 1. any `FileSystem.exists()` check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics. Initially, relying on Jenkins test runs. One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard. Author: Steve Loughran <stevel@apache.org> Closes #14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala27
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala24
3 files changed, 38 insertions, 30 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 398fa6500f..5cbad8bf3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -117,7 +117,7 @@ object Checkpoint extends Logging {
val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
- if (fs.exists(path)) {
+ try {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
@@ -127,9 +127,10 @@ object Checkpoint extends Logging {
logWarning(s"Listing $path returned null")
Seq.empty
}
- } else {
- logWarning(s"Checkpoint directory $path does not exist")
- Seq.empty
+ } catch {
+ case _: FileNotFoundException =>
+ logWarning(s"Checkpoint directory $path does not exist")
+ Seq.empty
}
}
@@ -229,9 +230,7 @@ class CheckpointWriter(
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
- if (fs.exists(tempFile)) {
- fs.delete(tempFile, true) // just in case it exists
- }
+ fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
@@ -242,9 +241,7 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true) // just in case it exists
- }
+ fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning(s"Could not rename $checkpointFile to $backupFile")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9b689f01b8..845f554308 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.streaming.util
+import java.io.FileNotFoundException
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.RejectedExecutionException
@@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) &&
- fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
- val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
- pastLogs.clear()
- pastLogs ++= logFileInfo
- logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
- logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ try {
+ // If you call listStatus(file) it returns a stat of the file in the array,
+ // rather than an array listing all the children.
+ // This makes it hard to differentiate listStatus(file) and
+ // listStatus(dir-with-one-child) except by examining the name of the returned status,
+ // and once you've got symlinks in the mix that differentiation isn't easy.
+ // Checking for the path being a directory is one more call to the filesystem, but
+ // leads to much clearer code.
+ if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
+ val logFileInfo = logFilesTologInfo(
+ fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ } catch {
+ case _: FileNotFoundException =>
+ // there is no log directory, hence nothing to recover
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 13a765d035..6a3b3200dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -44,18 +44,16 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
- if (dfs.isFile(dfsPath)) {
- try {
- dfs.open(dfsPath)
- } catch {
- case e: IOException =>
- // If we are really unlucky, the file may be deleted as we're opening the stream.
- // This can happen as clean up is performed by daemon threads that may be left over from
- // previous runs.
- if (!dfs.isFile(dfsPath)) null else throw e
- }
- } else {
- null
+ try {
+ dfs.open(dfsPath)
+ } catch {
+ case _: FileNotFoundException =>
+ null
+ case e: IOException =>
+ // If we are really unlucky, the file may be deleted as we're opening the stream.
+ // This can happen as clean up is performed by daemon threads that may be left over from
+ // previous runs.
+ if (!dfs.isFile(dfsPath)) null else throw e
}
}