aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala13
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala32
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala27
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala24
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
13 files changed, 92 insertions, 109 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a6853fe398..60f042f1e0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1410,9 +1410,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
- if (!fs.exists(hadoopPath)) {
- throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
- }
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bc09935f93..6874aa5f93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
- if (!fs.exists(path)) {
- var msg = s"Log directory specified does not exist: $logDir"
- if (logDir == DEFAULT_LOG_DIR) {
- msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+ try {
+ if (!fs.getFileStatus(path).isDirectory) {
+ throw new IllegalArgumentException(
+ "Logging directory specified is not a directory: %s".format(logDir))
}
- throw new IllegalArgumentException(msg)
- }
- if (!fs.getFileStatus(path).isDirectory) {
- throw new IllegalArgumentException(
- "Logging directory specified is not a directory: %s".format(logDir))
+ } catch {
+ case f: FileNotFoundException =>
+ var msg = s"Log directory specified does not exist: $logDir"
+ if (logDir == DEFAULT_LOG_DIR) {
+ msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+ }
+ throw new FileNotFoundException(msg).initCause(f)
}
// Disable the background thread during tests.
@@ -495,12 +497,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
- val path = new Path(logDir, attempt.logPath)
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- logWarning(s"Error deleting ${path}")
- }
- }
+ fs.delete(new Path(logDir, attempt.logPath), true)
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fddb935301..ab6554fd8a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
- if (fs.exists(tempOutputPath)) {
- throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
- }
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
@@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
- if (fs.exists(partitionerFilePath)) {
- val fileInputStream = fs.open(partitionerFilePath, bufferSize)
- val serializer = SparkEnv.get.serializer.newInstance()
- val deserializeStream = serializer.deserializeStream(fileInputStream)
- val partitioner = Utils.tryWithSafeFinally[Partitioner] {
- deserializeStream.readObject[Partitioner]
- } {
- deserializeStream.close()
- }
- logDebug(s"Read partitioner from $partitionerFilePath")
- Some(partitioner)
- } else {
- logDebug("No partitioner file")
- None
+ val fileInputStream = fs.open(partitionerFilePath, bufferSize)
+ val serializer = SparkEnv.get.serializer.newInstance()
+ val deserializeStream = serializer.deserializeStream(fileInputStream)
+ val partitioner = Utils.tryWithSafeFinally[Partitioner] {
+ deserializeStream.readObject[Partitioner]
+ } {
+ deserializeStream.close()
}
+ logDebug(s"Read partitioner from $partitionerFilePath")
+ Some(partitioner)
} catch {
+ case e: FileNotFoundException =>
+ logDebug("No partitioner file", e)
+ None
case NonFatal(e) =>
logWarning(s"Error reading partitioner from $checkpointDirPath, " +
s"partitioner will not be recovered which may lead to performance loss", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 74f187642a..b6d723c682 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
checkpointPath(sc, rddId).foreach { path =>
- val fs = path.getFileSystem(sc.hadoopConfiguration)
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- logWarning(s"Error deleting ${path.toString()}")
- }
- }
+ path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a7d0639117..ce7877469f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
*/
def start() {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
- throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
+ throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
val workingPath = logPath + IN_PROGRESS
@@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"
- if (shouldOverwrite && fileSystem.exists(path)) {
+ if (shouldOverwrite && fileSystem.delete(path, true)) {
logWarning(s"Event log $path already exists. Overwriting...")
- if (!fileSystem.delete(path, true)) {
- logWarning(s"Error deleting $path")
- }
}
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
* @return input stream that holds one JSON record per line.
*/
def openEventLog(log: Path, fs: FileSystem): InputStream = {
- // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
- // IOException when a file does not exist, so try our best to throw a proper exception.
- if (!fs.exists(log)) {
- throw new FileNotFoundException(s"File $log does not exist.")
- }
-
val in = new BufferedInputStream(fs.open(log))
// Compression codec is encoded as an extension, e.g. app_123.lzf
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 2f07395edf..df13b32451 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.repl
-import java.io.{ByteArrayOutputStream, FilterInputStream, InputStream, IOException}
+import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream, IOException}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import java.nio.channels.Channels
@@ -147,10 +147,11 @@ class ExecutorClassLoader(
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
pathInDirectory: String): InputStream = {
val path = new Path(directory, pathInDirectory)
- if (fileSystem.exists(path)) {
+ try {
fileSystem.open(path)
- } else {
- throw new ClassNotFoundException(s"Class file not found at path $path")
+ } catch {
+ case _: FileNotFoundException =>
+ throw new ClassNotFoundException(s"Class file not found at path $path")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755fd3..bec966b15e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.streaming.state
-import java.io.{DataInputStream, DataOutputStream, IOException}
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
- if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
+ if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
}
logInfo("Aborted")
@@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider(
/** Initialize the store provider */
private def initialize(): Unit = {
- if (!fs.exists(baseDir)) {
+ try {
fs.mkdirs(baseDir)
- } else {
- if (!fs.isDirectory(baseDir)) {
+ } catch {
+ case e: IOException =>
throw new IllegalStateException(
- s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
- s"$baseDir already exists and is not a directory")
- }
+ s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
}
}
@@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider(
private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
val fileToRead = deltaFile(version)
- if (!fs.exists(fileToRead)) {
- throw new IllegalStateException(
- s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
- }
var input: DataInputStream = null
+ val sourceStream = try {
+ fs.open(fileToRead)
+ } catch {
+ case f: FileNotFoundException =>
+ throw new IllegalStateException(
+ s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
+ }
try {
- input = decompressStream(fs.open(fileToRead))
+ input = decompressStream(sourceStream)
var eof = false
while(!eof) {
@@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider(
private def readSnapshotFile(version: Long): Option[MapType] = {
val fileToRead = snapshotFile(version)
- if (!fs.exists(fileToRead)) return None
-
val map = new MapType()
var input: DataInputStream = null
@@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider(
}
logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
Some(map)
+ } catch {
+ case _: FileNotFoundException =>
+ None
} finally {
if (input != null) input.close()
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index e73117c814..061c7431a6 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -75,9 +75,7 @@ public class JavaMetastoreDataSourcesSuite {
hiveManagedPath = new Path(
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
- if (fs.exists(hiveManagedPath)){
- fs.delete(hiveManagedPath, true);
- }
+ fs.delete(hiveManagedPath, true);
List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c36b0275f4..3892fe87e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -375,7 +375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
- if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+ fs.delete(filesystemPath, true)
// It is a managed table when we do not specify the location.
sql(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 398fa6500f..5cbad8bf3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -117,7 +117,7 @@ object Checkpoint extends Logging {
val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
- if (fs.exists(path)) {
+ try {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
@@ -127,9 +127,10 @@ object Checkpoint extends Logging {
logWarning(s"Listing $path returned null")
Seq.empty
}
- } else {
- logWarning(s"Checkpoint directory $path does not exist")
- Seq.empty
+ } catch {
+ case _: FileNotFoundException =>
+ logWarning(s"Checkpoint directory $path does not exist")
+ Seq.empty
}
}
@@ -229,9 +230,7 @@ class CheckpointWriter(
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
- if (fs.exists(tempFile)) {
- fs.delete(tempFile, true) // just in case it exists
- }
+ fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
@@ -242,9 +241,7 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true) // just in case it exists
- }
+ fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning(s"Could not rename $checkpointFile to $backupFile")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9b689f01b8..845f554308 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.streaming.util
+import java.io.FileNotFoundException
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.RejectedExecutionException
@@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) &&
- fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
- val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
- pastLogs.clear()
- pastLogs ++= logFileInfo
- logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
- logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ try {
+ // If you call listStatus(file) it returns a stat of the file in the array,
+ // rather than an array listing all the children.
+ // This makes it hard to differentiate listStatus(file) and
+ // listStatus(dir-with-one-child) except by examining the name of the returned status,
+ // and once you've got symlinks in the mix that differentiation isn't easy.
+ // Checking for the path being a directory is one more call to the filesystem, but
+ // leads to much clearer code.
+ if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
+ val logFileInfo = logFilesTologInfo(
+ fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ } catch {
+ case _: FileNotFoundException =>
+ // there is no log directory, hence nothing to recover
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 13a765d035..6a3b3200dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -44,18 +44,16 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
- if (dfs.isFile(dfsPath)) {
- try {
- dfs.open(dfsPath)
- } catch {
- case e: IOException =>
- // If we are really unlucky, the file may be deleted as we're opening the stream.
- // This can happen as clean up is performed by daemon threads that may be left over from
- // previous runs.
- if (!dfs.isFile(dfsPath)) null else throw e
- }
- } else {
- null
+ try {
+ dfs.open(dfsPath)
+ } catch {
+ case _: FileNotFoundException =>
+ null
+ case e: IOException =>
+ // If we are really unlucky, the file may be deleted as we're opening the stream.
+ // This can happen as clean up is performed by daemon threads that may be left over from
+ // previous runs.
+ if (!dfs.isFile(dfsPath)) null else throw e
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e3572d781b..93684005f1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -189,9 +189,8 @@ private[spark] class Client(
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val fs = stagingDirPath.getFileSystem(hadoopConf)
- if (!preserveFiles && fs.exists(stagingDirPath)) {
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
+ if (!preserveFiles && fs.delete(stagingDirPath, true)) {
+ logInfo(s"Deleted staging directory $stagingDirPath")
}
} catch {
case ioe: IOException =>