diff options
author | Yinan Li <liyinan926@gmail.com> | 2014-01-17 17:27:25 -0800 |
---|---|---|
committer | Yinan Li <liyinan926@gmail.com> | 2014-01-18 15:26:59 -0800 |
commit | fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7 (patch) | |
tree | cd67e9926fc20a5402d0e4b1da33ef2873188d75 /core | |
parent | aa981e4e97a11dbd5a4d012bfbdb395982968372 (diff) | |
download | spark-fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7.tar.gz spark-fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7.tar.bz2 spark-fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7.zip |
Allow files added through SparkContext.addFile() to be overwritten
This is useful for the cases when a file needs to be refreshed and downloaded
by the executors periodically.
Signed-off-by: Yinan Li <liyinan926@gmail.com>
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 49 |
1 files changed, 34 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index caa9bf4c92..e1f8e9520c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -268,6 +268,7 @@ private[spark] object Utils extends Logging { val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) val uri = new URI(url) + val fileOverwrite = System.getProperty("spark.files.overwrite", "false").toBoolean uri.getScheme match { case "http" | "https" | "ftp" => logInfo("Fetching " + url + " to " + tempFile) @@ -275,47 +276,65 @@ private[spark] object Utils extends Logging { val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { - tempFile.delete() - throw new SparkException( - "File " + targetFile + " exists and does not match contents of" + " " + url) - } else { - Files.move(tempFile, targetFile) + if (fileOverwrite) { + targetFile.delete() + logInfo(("File %s exists and does not match contents of %s, " + + "replacing it with %s").format(targetFile, url, url)) + } else { + tempFile.delete() + throw new SparkException( + "File " + targetFile + " exists and does not match contents of" + " " + url) + } } + Files.move(tempFile, targetFile) case "file" | null => // In the case of a local file, copy the local file to the target directory. // Note the difference between uri vs url. val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url) + var shouldCopy = true if (targetFile.exists) { - // If the target file already exists, warn the user if if (!Files.equal(sourceFile, targetFile)) { - throw new SparkException( - "File " + targetFile + " exists and does not match contents of" + " " + url) + if (fileOverwrite) { + targetFile.delete() + logInfo(("File %s exists and does not match contents of %s, " + + "replacing it with %s").format(targetFile, url, url)) + } else { + throw new SparkException( + "File " + targetFile + " exists and does not match contents of" + " " + url) + } } else { // Do nothing if the file contents are the same, i.e. this file has been copied // previously. logInfo(sourceFile.getAbsolutePath + " has been previously copied to " + targetFile.getAbsolutePath) + shouldCopy = false } - } else { + } + + if (shouldCopy) { // The file does not exist in the target directory. Copy it there. logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) Files.copy(sourceFile, targetFile) } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others - val uri = new URI(url) val conf = SparkHadoopUtil.get.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { - tempFile.delete() - throw new SparkException("File " + targetFile + " exists and does not match contents of" + - " " + url) - } else { - Files.move(tempFile, targetFile) + if (fileOverwrite) { + targetFile.delete() + logInfo(("File %s exists and does not match contents of %s, " + + "replacing it with %s").format(targetFile, url, url)) + } else { + tempFile.delete() + throw new SparkException( + "File " + targetFile + " exists and does not match contents of" + " " + url) + } } + Files.move(tempFile, targetFile) } // Decompress the file if it's a .tar or .tar.gz if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) { |