aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan Williams <ryan.blake.williams@gmail.com>2014-12-19 15:24:41 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-19 15:24:41 -0800
commit7981f969762e77f1752ef8f86c546d4fc32a1a4f (patch)
treeee8c8ddb115abb2ce2a38c14f217580381ed6b22 /core
parentcdb2c645ab769a8678dd81cff44a809fcfa4420b (diff)
downloadspark-7981f969762e77f1752ef8f86c546d4fc32a1a4f.tar.gz
spark-7981f969762e77f1752ef8f86c546d4fc32a1a4f.tar.bz2
spark-7981f969762e77f1752ef8f86c546d4fc32a1a4f.zip
[SPARK-4896] don’t redundantly overwrite executor JAR deps
Author: Ryan Williams <ryan.blake.williams@gmail.com> Closes #2848 from ryan-williams/fetch-file and squashes the following commits: c14daff [Ryan Williams] Fix copy that was changed to a move inadvertently 8e39c16 [Ryan Williams] code review feedback 788ed41 [Ryan Williams] don’t redundantly overwrite executor JAR deps
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala170
1 files changed, 107 insertions, 63 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d16233a0bc..5e1cb0c7a7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
} finally {
lock.release()
}
- if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
- if (conf.getBoolean("spark.files.overwrite", false)) {
- targetFile.delete()
- logInfo((s"File $targetFile exists and does not match contents of $url, " +
- s"replacing it with $url"))
- } else {
- throw new SparkException(s"File $targetFile exists and does not match contents of $url")
- }
- }
- Files.copy(cachedFile, targetFile)
+ copyFile(
+ url,
+ cachedFile,
+ targetFile,
+ conf.getBoolean("spark.files.overwrite", false)
+ )
} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
}
@@ -412,6 +408,104 @@ private[spark] object Utils extends Logging {
}
/**
+ * Download `in` to `tempFile`, then move it to `destFile`.
+ *
+ * If `destFile` already exists:
+ * - no-op if its contents equal those of `sourceFile`,
+ * - throw an exception if `fileOverwrite` is false,
+ * - attempt to overwrite it otherwise.
+ *
+ * @param url URL that `sourceFile` originated from, for logging purposes.
+ * @param in InputStream to download.
+ * @param tempFile File path to download `in` to.
+ * @param destFile File path to move `tempFile` to.
+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
+ * `sourceFile`
+ */
+ private def downloadFile(
+ url: String,
+ in: InputStream,
+ tempFile: File,
+ destFile: File,
+ fileOverwrite: Boolean): Unit = {
+
+ try {
+ val out = new FileOutputStream(tempFile)
+ Utils.copyStream(in, out, closeStreams = true)
+ copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true)
+ } finally {
+ // Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
+ // `destFile`.
+ if (tempFile.exists()) {
+ tempFile.delete()
+ }
+ }
+ }
+
+ /**
+ * Copy `sourceFile` to `destFile`.
+ *
+ * If `destFile` already exists:
+ * - no-op if its contents equal those of `sourceFile`,
+ * - throw an exception if `fileOverwrite` is false,
+ * - attempt to overwrite it otherwise.
+ *
+ * @param url URL that `sourceFile` originated from, for logging purposes.
+ * @param sourceFile File path to copy/move from.
+ * @param destFile File path to copy/move to.
+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
+ * `sourceFile`
+ * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
+ * `destFile`.
+ */
+ private def copyFile(
+ url: String,
+ sourceFile: File,
+ destFile: File,
+ fileOverwrite: Boolean,
+ removeSourceFile: Boolean = false): Unit = {
+
+ if (destFile.exists) {
+ if (!Files.equal(sourceFile, destFile)) {
+ if (fileOverwrite) {
+ logInfo(
+ s"File $destFile exists and does not match contents of $url, replacing it with $url"
+ )
+ if (!destFile.delete()) {
+ throw new SparkException(
+ "Failed to delete %s while attempting to overwrite it with %s".format(
+ destFile.getAbsolutePath,
+ sourceFile.getAbsolutePath
+ )
+ )
+ }
+ } else {
+ throw new SparkException(
+ s"File $destFile exists and does not match contents of $url")
+ }
+ } else {
+ // Do nothing if the file contents are the same, i.e. this file has been copied
+ // previously.
+ logInfo(
+ "%s has been previously copied to %s".format(
+ sourceFile.getAbsolutePath,
+ destFile.getAbsolutePath
+ )
+ )
+ return
+ }
+ }
+
+ // The file does not exist in the target directory. Copy or move it there.
+ if (removeSourceFile) {
+ Files.move(sourceFile, destFile)
+ } else {
+ logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
+ Files.copy(sourceFile, destFile)
+ }
+ }
+
+ /**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
@@ -449,67 +543,17 @@ private[spark] object Utils extends Logging {
uc.setReadTimeout(timeout)
uc.connect()
val in = uc.getInputStream()
- val out = new FileOutputStream(tempFile)
- Utils.copyStream(in, out, closeStreams = true)
- if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- if (fileOverwrite) {
- targetFile.delete()
- logInfo(("File %s exists and does not match contents of %s, " +
- "replacing it with %s").format(targetFile, url, url))
- } else {
- tempFile.delete()
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- }
- }
- Files.move(tempFile, targetFile)
+ downloadFile(url, in, tempFile, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
- var shouldCopy = true
- if (targetFile.exists) {
- if (!Files.equal(sourceFile, targetFile)) {
- if (fileOverwrite) {
- targetFile.delete()
- logInfo(("File %s exists and does not match contents of %s, " +
- "replacing it with %s").format(targetFile, url, url))
- } else {
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- }
- } else {
- // Do nothing if the file contents are the same, i.e. this file has been copied
- // previously.
- logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
- + targetFile.getAbsolutePath)
- shouldCopy = false
- }
- }
-
- if (shouldCopy) {
- // The file does not exist in the target directory. Copy it there.
- logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
- Files.copy(sourceFile, targetFile)
- }
+ copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
- val out = new FileOutputStream(tempFile)
- Utils.copyStream(in, out, closeStreams = true)
- if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- if (fileOverwrite) {
- targetFile.delete()
- logInfo(("File %s exists and does not match contents of %s, " +
- "replacing it with %s").format(targetFile, url, url))
- } else {
- tempFile.delete()
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- }
- }
- Files.move(tempFile, targetFile)
+ downloadFile(url, in, tempFile, targetFile, fileOverwrite)
}
}