aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYinan Li <liyinan926@gmail.com>2014-01-17 17:27:25 -0800
committerYinan Li <liyinan926@gmail.com>2014-01-18 15:26:59 -0800
commitfd833e7ab1bf006c5ae1dff4767b02729e1bbfa7 (patch)
treecd67e9926fc20a5402d0e4b1da33ef2873188d75
parentaa981e4e97a11dbd5a4d012bfbdb395982968372 (diff)
downloadspark-fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7.tar.gz
spark-fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7.tar.bz2
spark-fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7.zip
Allow files added through SparkContext.addFile() to be overwritten
This is useful for the cases when a file needs to be refreshed and downloaded by the executors periodically. Signed-off-by: Yinan Li <liyinan926@gmail.com>
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala49
-rw-r--r--docs/configuration.md8
2 files changed, 42 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index caa9bf4c92..e1f8e9520c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -268,6 +268,7 @@ private[spark] object Utils extends Logging {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
+ val fileOverwrite = System.getProperty("spark.files.overwrite", "false").toBoolean
uri.getScheme match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)
@@ -275,47 +276,65 @@ private[spark] object Utils extends Logging {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- tempFile.delete()
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- } else {
- Files.move(tempFile, targetFile)
+ if (fileOverwrite) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ tempFile.delete()
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
+ }
}
+ Files.move(tempFile, targetFile)
case "file" | null =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+ var shouldCopy = true
if (targetFile.exists) {
- // If the target file already exists, warn the user if
if (!Files.equal(sourceFile, targetFile)) {
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
+ if (fileOverwrite) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
+ }
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ targetFile.getAbsolutePath)
+ shouldCopy = false
}
- } else {
+ }
+
+ if (shouldCopy) {
// The file does not exist in the target directory. Copy it there.
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val uri = new URI(url)
val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- tempFile.delete()
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
- } else {
- Files.move(tempFile, targetFile)
+ if (fileOverwrite) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ tempFile.delete()
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
+ }
}
+ Files.move(tempFile, targetFile)
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
diff --git a/docs/configuration.md b/docs/configuration.md
index da70cabba2..3b565e4347 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -431,6 +431,7 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+<<<<<<< HEAD
<td>spark.logConf</td>
<td>false</td>
<td>
@@ -459,6 +460,13 @@ Apart from these, the following properties are also available, and may be useful
the whole cluster by default. <br/>
<b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+</td>
+</tr>
+<tr>
+ <td>spark.files.overwrite</td>
+ <td>false</td>
+ <td>
+ Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
</td>
</tr>
</table>