aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSteve Loughran <stevel@apache.org>2016-08-17 11:42:57 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-17 11:43:01 -0700
commitcc97ea188e1d5b8e851d1a8438b8af092783ec04 (patch)
tree6128ca95597fd4765608e986f501dafcbbccf464 /core
parent4d92af310ad29ade039e4130f91f2a3d9180deef (diff)
downloadspark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.tar.gz
spark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.tar.bz2
spark-cc97ea188e1d5b8e851d1a8438b8af092783ec04.zip
[SPARK-16736][CORE][SQL] purge superfluous fs calls
A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous. 1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes 1. any `FileSystem.exists()` check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics. Initially, relying on Jenkins test runs. One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard. Author: Steve Loughran <stevel@apache.org> Closes #14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala13
5 files changed, 28 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a6853fe398..60f042f1e0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1410,9 +1410,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
- if (!fs.exists(hadoopPath)) {
- throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
- }
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bc09935f93..6874aa5f93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
- if (!fs.exists(path)) {
- var msg = s"Log directory specified does not exist: $logDir"
- if (logDir == DEFAULT_LOG_DIR) {
- msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+ try {
+ if (!fs.getFileStatus(path).isDirectory) {
+ throw new IllegalArgumentException(
+ "Logging directory specified is not a directory: %s".format(logDir))
}
- throw new IllegalArgumentException(msg)
- }
- if (!fs.getFileStatus(path).isDirectory) {
- throw new IllegalArgumentException(
- "Logging directory specified is not a directory: %s".format(logDir))
+ } catch {
+ case f: FileNotFoundException =>
+ var msg = s"Log directory specified does not exist: $logDir"
+ if (logDir == DEFAULT_LOG_DIR) {
+ msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+ }
+ throw new FileNotFoundException(msg).initCause(f)
}
// Disable the background thread during tests.
@@ -495,12 +497,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
- val path = new Path(logDir, attempt.logPath)
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- logWarning(s"Error deleting ${path}")
- }
- }
+ fs.delete(new Path(logDir, attempt.logPath), true)
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fddb935301..ab6554fd8a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
- if (fs.exists(tempOutputPath)) {
- throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
- }
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
@@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
- if (fs.exists(partitionerFilePath)) {
- val fileInputStream = fs.open(partitionerFilePath, bufferSize)
- val serializer = SparkEnv.get.serializer.newInstance()
- val deserializeStream = serializer.deserializeStream(fileInputStream)
- val partitioner = Utils.tryWithSafeFinally[Partitioner] {
- deserializeStream.readObject[Partitioner]
- } {
- deserializeStream.close()
- }
- logDebug(s"Read partitioner from $partitionerFilePath")
- Some(partitioner)
- } else {
- logDebug("No partitioner file")
- None
+ val fileInputStream = fs.open(partitionerFilePath, bufferSize)
+ val serializer = SparkEnv.get.serializer.newInstance()
+ val deserializeStream = serializer.deserializeStream(fileInputStream)
+ val partitioner = Utils.tryWithSafeFinally[Partitioner] {
+ deserializeStream.readObject[Partitioner]
+ } {
+ deserializeStream.close()
}
+ logDebug(s"Read partitioner from $partitionerFilePath")
+ Some(partitioner)
} catch {
+ case e: FileNotFoundException =>
+ logDebug("No partitioner file", e)
+ None
case NonFatal(e) =>
logWarning(s"Error reading partitioner from $checkpointDirPath, " +
s"partitioner will not be recovered which may lead to performance loss", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 74f187642a..b6d723c682 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
checkpointPath(sc, rddId).foreach { path =>
- val fs = path.getFileSystem(sc.hadoopConfiguration)
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- logWarning(s"Error deleting ${path.toString()}")
- }
- }
+ path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a7d0639117..ce7877469f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
*/
def start() {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
- throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
+ throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
val workingPath = logPath + IN_PROGRESS
@@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"
- if (shouldOverwrite && fileSystem.exists(path)) {
+ if (shouldOverwrite && fileSystem.delete(path, true)) {
logWarning(s"Event log $path already exists. Overwriting...")
- if (!fileSystem.delete(path, true)) {
- logWarning(s"Error deleting $path")
- }
}
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
* @return input stream that holds one JSON record per line.
*/
def openEventLog(log: Path, fs: FileSystem): InputStream = {
- // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
- // IOException when a file does not exist, so try our best to throw a proper exception.
- if (!fs.exists(log)) {
- throw new FileNotFoundException(s"File $log does not exist.")
- }
-
val in = new BufferedInputStream(fs.open(log))
// Compression codec is encoded as an extension, e.g. app_123.lzf