aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 16:13:23 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 17:00:57 -0800
commitf1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3 (patch)
tree37d8c1be628a7f9a9b151fa79ff817490714c8b6 /core
parent84587a9bf3c734be151251b97ac5af48eb03f4d9 (diff)
downloadspark-f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3.tar.gz
spark-f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3.tar.bz2
spark-f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3.zip
Skip deletion of files in clearFiles().
This fixes an issue where Spark could delete original files in the current working directory that were added to the job using addFile(). There was also the potential for addFile() to overwrite local files, which is addressed by changing Utils.fetchFile() to log a warning instead of overwriting a file with new contents. This is a short-term fix; a better long-term solution would be to remove the dependence on storing files in the current working directory, since we can't change the cwd from Java.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/spark/Utils.scala57
2 files changed, 46 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0afab522af..4fd81bc63b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -419,8 +419,9 @@ class SparkContext(
}
addedFiles(key) = System.currentTimeMillis
- // Fetch the file locally in case the task is executed locally
- val filename = new File(path.split("/").last)
+ // Fetch the file locally in case a job is executed locally.
+ // Jobs that run through LocalScheduler will already fetch the required dependencies,
+ // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
Utils.fetchFile(path, new File("."))
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
@@ -437,11 +438,10 @@ class SparkContext(
}
/**
- * Clear the job's list of files added by `addFile` so that they do not get donwloaded to
+ * Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
def clearFiles() {
- addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedFiles.clear()
}
@@ -465,7 +465,6 @@ class SparkContext(
* any new nodes.
*/
def clearJars() {
- addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedJars.clear()
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 6d64b32174..c10b415a93 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -9,6 +9,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
+import com.google.common.io.Files
/**
* Various utility methods used by Spark.
@@ -130,28 +131,47 @@ private object Utils extends Logging {
*/
def fetchFile(url: String, targetDir: File) {
val filename = url.split("/").last
+ val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))
+ val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
uri.getScheme match {
case "http" | "https" | "ftp" =>
- logInfo("Fetching " + url + " to " + targetFile)
+ logInfo("Fetching " + url + " to " + tempFile)
val in = new URL(url).openStream()
- val out = new FileOutputStream(targetFile)
+ val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
+ if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
+ logWarning("File " + targetFile + " exists and does not match contents of " + url +
+ "; using existing version")
+ tempFile.delete()
+ } else {
+ Files.move(tempFile, targetFile)
+ }
case "file" | null =>
- // Remove the file if it already exists
- targetFile.delete()
- // Symlink the file locally.
- if (uri.isAbsolute) {
- // url is absolute, i.e. it starts with "file:///". Extract the source
- // file's absolute path from the url.
- val sourceFile = new File(uri)
- logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ val sourceFile = if (uri.isAbsolute) {
+ new File(uri)
+ } else {
+ new File(url)
+ }
+ if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
+ logWarning("File " + targetFile + " exists and does not match contents of " + url +
+ "; using existing version")
} else {
- // url is not absolute, i.e. itself is the path to the source file.
- logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(url, targetFile.getAbsolutePath)
+ // Remove the file if it already exists
+ targetFile.delete()
+ // Symlink the file locally.
+ if (uri.isAbsolute) {
+ // url is absolute, i.e. it starts with "file:///". Extract the source
+ // file's absolute path from the url.
+ val sourceFile = new File(uri)
+ logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+ FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ } else {
+ // url is not absolute, i.e. itself is the path to the source file.
+ logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
+ FileUtil.symLink(url, targetFile.getAbsolutePath)
+ }
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
@@ -159,8 +179,15 @@ private object Utils extends Logging {
val conf = new Configuration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
- val out = new FileOutputStream(targetFile)
+ val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
+ if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
+ logWarning("File " + targetFile + " exists and does not match contents of " + url +
+ "; using existing version")
+ tempFile.delete()
+ } else {
+ Files.move(tempFile, targetFile)
+ }
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {