aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Loughran <stevel@apache.org>2016-08-17 11:42:57 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-17 11:43:01 -0700
commitcc97ea188e1d5b8e851d1a8438b8af092783ec04 (patch)
tree6128ca95597fd4765608e986f501dafcbbccf464
parent4d92af310ad29ade039e4130f91f2a3d9180deef (diff)
downloadspark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.tar.gz
spark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.tar.bz2
spark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.zip
[SPARK-16736][CORE][SQL] purge superfluous fs calls
A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous. 1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes 1. any `FileSystem.exists()` check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics. Initially, relying on Jenkins test runs. One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard. Author: Steve Loughran <stevel@apache.org> Closes #14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala13
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala32
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala27
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala24
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
13 files changed, 92 insertions, 109 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a6853fe398..60f042f1e0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1410,9 +1410,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
- if (!fs.exists(hadoopPath)) {
- throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
- }
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bc09935f93..6874aa5f93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
- if (!fs.exists(path)) {
- var msg = s"Log directory specified does not exist: $logDir"
- if (logDir == DEFAULT_LOG_DIR) {
- msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+ try {
+ if (!fs.getFileStatus(path).isDirectory) {
+ throw new IllegalArgumentException(
+ "Logging directory specified is not a directory: %s".format(logDir))
}
- throw new IllegalArgumentException(msg)
- }
- if (!fs.getFileStatus(path).isDirectory) {
- throw new IllegalArgumentException(
- "Logging directory specified is not a directory: %s".format(logDir))
+ } catch {
+ case f: FileNotFoundException =>
+ var msg = s"Log directory specified does not exist: $logDir"
+ if (logDir == DEFAULT_LOG_DIR) {
+ msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+ }
+ throw new FileNotFoundException(msg).initCause(f)
}
// Disable the background thread during tests.
@@ -495,12 +497,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
- val path = new Path(logDir, attempt.logPath)
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- logWarning(s"Error deleting ${path}")
- }
- }
+ fs.delete(new Path(logDir, attempt.logPath), true)
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fddb935301..ab6554fd8a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
- if (fs.exists(tempOutputPath)) {
- throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
- }
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
@@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
- if (fs.exists(partitionerFilePath)) {
- val fileInputStream = fs.open(partitionerFilePath, bufferSize)
- val serializer = SparkEnv.get.serializer.newInstance()
- val deserializeStream = serializer.deserializeStream(fileInputStream)
- val partitioner = Utils.tryWithSafeFinally[Partitioner] {
- deserializeStream.readObject[Partitioner]
- } {
- deserializeStream.close()
- }
- logDebug(s"Read partitioner from $partitionerFilePath")
- Some(partitioner)
- } else {
- logDebug("No partitioner file")
- None
+ val fileInputStream = fs.open(partitionerFilePath, bufferSize)
+ val serializer = SparkEnv.get.serializer.newInstance()
+ val deserializeStream = serializer.deserializeStream(fileInputStream)
+ val partitioner = Utils.tryWithSafeFinally[Partitioner] {
+ deserializeStream.readObject[Partitioner]
+ } {
+ deserializeStream.close()
}
+ logDebug(s"Read partitioner from $partitionerFilePath")
+ Some(partitioner)
} catch {
+ case e: FileNotFoundException =>
+ logDebug("No partitioner file", e)
+ None
case NonFatal(e) =>
logWarning(s"Error reading partitioner from $checkpointDirPath, " +
s"partitioner will not be recovered which may lead to performance loss", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 74f187642a..b6d723c682 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
checkpointPath(sc, rddId).foreach { path =>
- val fs = path.getFileSystem(sc.hadoopConfiguration)
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- logWarning(s"Error deleting ${path.toString()}")
- }
- }
+ path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a7d0639117..ce7877469f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
*/
def start() {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
- throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
+ throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
val workingPath = logPath + IN_PROGRESS
@@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"
- if (shouldOverwrite && fileSystem.exists(path)) {
+ if (shouldOverwrite && fileSystem.delete(path, true)) {
logWarning(s"Event log $path already exists. Overwriting...")
- if (!fileSystem.delete(path, true)) {
- logWarning(s"Error deleting $path")
- }
}
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
* @return input stream that holds one JSON record per line.
*/
def openEventLog(log: Path, fs: FileSystem): InputStream = {
- // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
- // IOException when a file does not exist, so try our best to throw a proper exception.
- if (!fs.exists(log)) {
- throw new FileNotFoundException(s"File $log does not exist.")
- }
-
val in = new BufferedInputStream(fs.open(log))
// Compression codec is encoded as an extension, e.g. app_123.lzf
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 2f07395edf..df13b32451 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.repl
-import java.io.{ByteArrayOutputStream, FilterInputStream, InputStream, IOException}
+import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream, IOException}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import java.nio.channels.Channels
@@ -147,10 +147,11 @@ class ExecutorClassLoader(
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
pathInDirectory: String): InputStream = {
val path = new Path(directory, pathInDirectory)
- if (fileSystem.exists(path)) {
+ try {
fileSystem.open(path)
- } else {
- throw new ClassNotFoundException(s"Class file not found at path $path")
+ } catch {
+ case _: FileNotFoundException =>
+ throw new ClassNotFoundException(s"Class file not found at path $path")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755fd3..bec966b15e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.streaming.state
-import java.io.{DataInputStream, DataOutputStream, IOException}
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
- if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
+ if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
}
logInfo("Aborted")
@@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider(
/** Initialize the store provider */
private def initialize(): Unit = {
- if (!fs.exists(baseDir)) {
+ try {
fs.mkdirs(baseDir)
- } else {
- if (!fs.isDirectory(baseDir)) {
+ } catch {
+ case e: IOException =>
throw new IllegalStateException(
- s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
- s"$baseDir already exists and is not a directory")
- }
+ s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
}
}
@@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider(
private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
val fileToRead = deltaFile(version)
- if (!fs.exists(fileToRead)) {
- throw new IllegalStateException(
- s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
- }
var input: DataInputStream = null
+ val sourceStream = try {
+ fs.open(fileToRead)
+ } catch {
+ case f: FileNotFoundException =>
+ throw new IllegalStateException(
+ s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
+ }
try {
- input = decompressStream(fs.open(fileToRead))
+ input = decompressStream(sourceStream)
var eof = false
while(!eof) {
@@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider(
private def readSnapshotFile(version: Long): Option[MapType] = {
val fileToRead = snapshotFile(version)
- if (!fs.exists(fileToRead)) return None
-
val map = new MapType()
var input: DataInputStream = null
@@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider(
}
logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
Some(map)
+ } catch {
+ case _: FileNotFoundException =>
+ None
} finally {
if (input != null) input.close()
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index e73117c814..061c7431a6 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -75,9 +75,7 @@ public class JavaMetastoreDataSourcesSuite {
hiveManagedPath = new Path(
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
- if (fs.exists(hiveManagedPath)){
- fs.delete(hiveManagedPath, true);
- }
+ fs.delete(hiveManagedPath, true);
List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c36b0275f4..3892fe87e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -375,7 +375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
- if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+ fs.delete(filesystemPath, true)
// It is a managed table when we do not specify the location.
sql(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 398fa6500f..5cbad8bf3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -117,7 +117,7 @@ object Checkpoint extends Logging {
val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
- if (fs.exists(path)) {
+ try {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
@@ -127,9 +127,10 @@ object Checkpoint extends Logging {
logWarning(s"Listing $path returned null")
Seq.empty
}
- } else {
- logWarning(s"Checkpoint directory $path does not exist")
- Seq.empty
+ } catch {
+ case _: FileNotFoundException =>
+ logWarning(s"Checkpoint directory $path does not exist")
+ Seq.empty
}
}
@@ -229,9 +230,7 @@ class CheckpointWriter(
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
- if (fs.exists(tempFile)) {
- fs.delete(tempFile, true) // just in case it exists
- }
+ fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
@@ -242,9 +241,7 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true) // just in case it exists
- }
+ fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning(s"Could not rename $checkpointFile to $backupFile")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9b689f01b8..845f554308 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.streaming.util
+import java.io.FileNotFoundException
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.RejectedExecutionException
@@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) &&
- fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
- val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
- pastLogs.clear()
- pastLogs ++= logFileInfo
- logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
- logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ try {
+ // If you call listStatus(file) it returns a stat of the file in the array,
+ // rather than an array listing all the children.
+ // This makes it hard to differentiate listStatus(file) and
+ // listStatus(dir-with-one-child) except by examining the name of the returned status,
+ // and once you've got symlinks in the mix that differentiation isn't easy.
+ // Checking for the path being a directory is one more call to the filesystem, but
+ // leads to much clearer code.
+ if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
+ val logFileInfo = logFilesTologInfo(
+ fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ } catch {
+ case _: FileNotFoundException =>
+ // there is no log directory, hence nothing to recover
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 13a765d035..6a3b3200dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -44,18 +44,16 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
- if (dfs.isFile(dfsPath)) {
- try {
- dfs.open(dfsPath)
- } catch {
- case e: IOException =>
- // If we are really unlucky, the file may be deleted as we're opening the stream.
- // This can happen as clean up is performed by daemon threads that may be left over from
- // previous runs.
- if (!dfs.isFile(dfsPath)) null else throw e
- }
- } else {
- null
+ try {
+ dfs.open(dfsPath)
+ } catch {
+ case _: FileNotFoundException =>
+ null
+ case e: IOException =>
+ // If we are really unlucky, the file may be deleted as we're opening the stream.
+ // This can happen as clean up is performed by daemon threads that may be left over from
+ // previous runs.
+ if (!dfs.isFile(dfsPath)) null else throw e
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e3572d781b..93684005f1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -189,9 +189,8 @@ private[spark] class Client(
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val fs = stagingDirPath.getFileSystem(hadoopConf)
- if (!preserveFiles && fs.exists(stagingDirPath)) {
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
+ if (!preserveFiles && fs.delete(stagingDirPath, true)) {
+ logInfo(s"Deleted staging directory $stagingDirPath")
}
} catch {
case ioe: IOException =>