diff options
author | Ryan Williams <ryan.blake.williams@gmail.com> | 2014-12-19 15:24:41 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-19 15:24:41 -0800 |
commit | 7981f969762e77f1752ef8f86c546d4fc32a1a4f (patch) | |
tree | ee8c8ddb115abb2ce2a38c14f217580381ed6b22 /core | |
parent | cdb2c645ab769a8678dd81cff44a809fcfa4420b (diff) | |
download | spark-7981f969762e77f1752ef8f86c546d4fc32a1a4f.tar.gz spark-7981f969762e77f1752ef8f86c546d4fc32a1a4f.tar.bz2 spark-7981f969762e77f1752ef8f86c546d4fc32a1a4f.zip |
[SPARK-4896] don’t redundantly overwrite executor JAR deps
Author: Ryan Williams <ryan.blake.williams@gmail.com>
Closes #2848 from ryan-williams/fetch-file and squashes the following commits:
c14daff [Ryan Williams] Fix copy that was changed to a move inadvertently
8e39c16 [Ryan Williams] code review feedback
788ed41 [Ryan Williams] don’t redundantly overwrite executor JAR deps
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 170 |
1 files changed, 107 insertions, 63 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d16233a0bc..5e1cb0c7a7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -385,16 +385,12 @@ private[spark] object Utils extends Logging { } finally { lock.release() } - if (targetFile.exists && !Files.equal(cachedFile, targetFile)) { - if (conf.getBoolean("spark.files.overwrite", false)) { - targetFile.delete() - logInfo((s"File $targetFile exists and does not match contents of $url, " + - s"replacing it with $url")) - } else { - throw new SparkException(s"File $targetFile exists and does not match contents of $url") - } - } - Files.copy(cachedFile, targetFile) + copyFile( + url, + cachedFile, + targetFile, + conf.getBoolean("spark.files.overwrite", false) + ) } else { doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) } @@ -412,6 +408,104 @@ private[spark] object Utils extends Logging { } /** + * Download `in` to `tempFile`, then move it to `destFile`. + * + * If `destFile` already exists: + * - no-op if its contents equal those of `sourceFile`, + * - throw an exception if `fileOverwrite` is false, + * - attempt to overwrite it otherwise. + * + * @param url URL that `sourceFile` originated from, for logging purposes. + * @param in InputStream to download. + * @param tempFile File path to download `in` to. + * @param destFile File path to move `tempFile` to. + * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match + * `sourceFile` + */ + private def downloadFile( + url: String, + in: InputStream, + tempFile: File, + destFile: File, + fileOverwrite: Boolean): Unit = { + + try { + val out = new FileOutputStream(tempFile) + Utils.copyStream(in, out, closeStreams = true) + copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true) + } finally { + // Catch-all for the couple of cases where for some reason we didn't move `tempFile` to + // `destFile`. + if (tempFile.exists()) { + tempFile.delete() + } + } + } + + /** + * Copy `sourceFile` to `destFile`. + * + * If `destFile` already exists: + * - no-op if its contents equal those of `sourceFile`, + * - throw an exception if `fileOverwrite` is false, + * - attempt to overwrite it otherwise. + * + * @param url URL that `sourceFile` originated from, for logging purposes. + * @param sourceFile File path to copy/move from. + * @param destFile File path to copy/move to. + * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match + * `sourceFile` + * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to + * `destFile`. + */ + private def copyFile( + url: String, + sourceFile: File, + destFile: File, + fileOverwrite: Boolean, + removeSourceFile: Boolean = false): Unit = { + + if (destFile.exists) { + if (!Files.equal(sourceFile, destFile)) { + if (fileOverwrite) { + logInfo( + s"File $destFile exists and does not match contents of $url, replacing it with $url" + ) + if (!destFile.delete()) { + throw new SparkException( + "Failed to delete %s while attempting to overwrite it with %s".format( + destFile.getAbsolutePath, + sourceFile.getAbsolutePath + ) + ) + } + } else { + throw new SparkException( + s"File $destFile exists and does not match contents of $url") + } + } else { + // Do nothing if the file contents are the same, i.e. this file has been copied + // previously. + logInfo( + "%s has been previously copied to %s".format( + sourceFile.getAbsolutePath, + destFile.getAbsolutePath + ) + ) + return + } + } + + // The file does not exist in the target directory. Copy or move it there. + if (removeSourceFile) { + Files.move(sourceFile, destFile) + } else { + logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}") + Files.copy(sourceFile, destFile) + } + } + + /** * Download a file to target directory. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. * @@ -449,67 +543,17 @@ private[spark] object Utils extends Logging { uc.setReadTimeout(timeout) uc.connect() val in = uc.getInputStream() - val out = new FileOutputStream(tempFile) - Utils.copyStream(in, out, closeStreams = true) - if (targetFile.exists && !Files.equal(tempFile, targetFile)) { - if (fileOverwrite) { - targetFile.delete() - logInfo(("File %s exists and does not match contents of %s, " + - "replacing it with %s").format(targetFile, url, url)) - } else { - tempFile.delete() - throw new SparkException( - "File " + targetFile + " exists and does not match contents of" + " " + url) - } - } - Files.move(tempFile, targetFile) + downloadFile(url, in, tempFile, targetFile, fileOverwrite) case "file" => // In the case of a local file, copy the local file to the target directory. // Note the difference between uri vs url. val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url) - var shouldCopy = true - if (targetFile.exists) { - if (!Files.equal(sourceFile, targetFile)) { - if (fileOverwrite) { - targetFile.delete() - logInfo(("File %s exists and does not match contents of %s, " + - "replacing it with %s").format(targetFile, url, url)) - } else { - throw new SparkException( - "File " + targetFile + " exists and does not match contents of" + " " + url) - } - } else { - // Do nothing if the file contents are the same, i.e. this file has been copied - // previously. - logInfo(sourceFile.getAbsolutePath + " has been previously copied to " - + targetFile.getAbsolutePath) - shouldCopy = false - } - } - - if (shouldCopy) { - // The file does not exist in the target directory. Copy it there. - logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) - Files.copy(sourceFile, targetFile) - } + copyFile(url, sourceFile, targetFile, fileOverwrite) case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val fs = getHadoopFileSystem(uri, hadoopConf) val in = fs.open(new Path(uri)) - val out = new FileOutputStream(tempFile) - Utils.copyStream(in, out, closeStreams = true) - if (targetFile.exists && !Files.equal(tempFile, targetFile)) { - if (fileOverwrite) { - targetFile.delete() - logInfo(("File %s exists and does not match contents of %s, " + - "replacing it with %s").format(targetFile, url, url)) - } else { - tempFile.delete() - throw new SparkException( - "File " + targetFile + " exists and does not match contents of" + " " + url) - } - } - Files.move(tempFile, targetFile) + downloadFile(url, in, tempFile, targetFile, fileOverwrite) } } |