aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-27 17:08:35 -0800
committerReynold Xin <rxin@apache.org>2014-01-27 17:08:35 -0800
commit84670f2715392859624df290c1b52eb4ed4a9cb1 (patch)
tree5d3a95afe7e9e7527a271886a4b4dd1f44820434
parent3d5c03e2305777b8a32f2e196e3b73ab221b3e79 (diff)
parent584323c6b13e0d4624eb39360d0caff6c8232aac (diff)
downloadspark-84670f2715392859624df290c1b52eb4ed4a9cb1.tar.gz
spark-84670f2715392859624df290c1b52eb4ed4a9cb1.tar.bz2
spark-84670f2715392859624df290c1b52eb4ed4a9cb1.zip
Merge pull request #466 from liyinan926/file-overwrite-new
Allow files added through SparkContext.addFile() to be overwritten This is useful for the cases when a file needs to be refreshed and downloaded by the executors periodically. For example, a possible use case is: the driver periodically renews a Hadoop delegation token and writes it to a token file. The token file needs to be downloaded by the executors whenever it gets renewed. However, the current implementation throws an exception when the target file exists and its contents do not match those of the new source. This PR adds an option to allow files to be overwritten to support use cases similar to the above.
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala49
-rw-r--r--docs/configuration.md7
2 files changed, 41 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8447773343..861ad62f9f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -247,6 +247,7 @@ private[spark] object Utils extends Logging {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
+ val fileOverwrite = conf.getBoolean("spark.files.overwrite", false)
uri.getScheme match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)
@@ -254,47 +255,65 @@ private[spark] object Utils extends Logging {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- tempFile.delete()
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- } else {
- Files.move(tempFile, targetFile)
+ if (fileOverwrite) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ tempFile.delete()
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
+ }
}
+ Files.move(tempFile, targetFile)
case "file" | null =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+ var shouldCopy = true
if (targetFile.exists) {
- // If the target file already exists, warn the user if
if (!Files.equal(sourceFile, targetFile)) {
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
+ if (fileOverwrite) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
+ }
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ targetFile.getAbsolutePath)
+ shouldCopy = false
}
- } else {
+ }
+
+ if (shouldCopy) {
// The file does not exist in the target directory. Copy it there.
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val uri = new URI(url)
val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- tempFile.delete()
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
- } else {
- Files.move(tempFile, targetFile)
+ if (fileOverwrite) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ tempFile.delete()
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
+ }
}
+ Files.move(tempFile, targetFile)
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
diff --git a/docs/configuration.md b/docs/configuration.md
index 3bb655075f..5c4714dc24 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -454,6 +454,13 @@ Apart from these, the following properties are also available, and may be useful
the whole cluster by default. <br/>
<b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+</td>
+</tr>
+<tr>
+ <td>spark.files.overwrite</td>
+ <td>false</td>
+ <td>
+ Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
</td>
</tr>
</table>