aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/Utils.scala
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 16:13:23 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 17:00:57 -0800
commitf1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3 (patch)
tree37d8c1be628a7f9a9b151fa79ff817490714c8b6 /core/src/main/scala/spark/Utils.scala
parent84587a9bf3c734be151251b97ac5af48eb03f4d9 (diff)
downloadspark-f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3.tar.gz
spark-f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3.tar.bz2
spark-f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3.zip
Skip deletion of files in clearFiles().
This fixes an issue where Spark could delete original files in the current working directory that were added to the job using addFile(). There was also the potential for addFile() to overwrite local files, which is addressed by changing Utils.fetchFile() to log a warning instead of overwriting a file with new contents. This is a short-term fix; a better long-term solution would be to remove the dependence on storing files in the current working directory, since we can't change the cwd from Java.
Diffstat (limited to 'core/src/main/scala/spark/Utils.scala')
-rw-r--r--core/src/main/scala/spark/Utils.scala57
1 files changed, 42 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 6d64b32174..c10b415a93 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -9,6 +9,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
+import com.google.common.io.Files
/**
* Various utility methods used by Spark.
@@ -130,28 +131,47 @@ private object Utils extends Logging {
*/
def fetchFile(url: String, targetDir: File) {
val filename = url.split("/").last
+ val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))
+ val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
uri.getScheme match {
case "http" | "https" | "ftp" =>
- logInfo("Fetching " + url + " to " + targetFile)
+ logInfo("Fetching " + url + " to " + tempFile)
val in = new URL(url).openStream()
- val out = new FileOutputStream(targetFile)
+ val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
+ if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
+ logWarning("File " + targetFile + " exists and does not match contents of " + url +
+ "; using existing version")
+ tempFile.delete()
+ } else {
+ Files.move(tempFile, targetFile)
+ }
case "file" | null =>
- // Remove the file if it already exists
- targetFile.delete()
- // Symlink the file locally.
- if (uri.isAbsolute) {
- // url is absolute, i.e. it starts with "file:///". Extract the source
- // file's absolute path from the url.
- val sourceFile = new File(uri)
- logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ val sourceFile = if (uri.isAbsolute) {
+ new File(uri)
+ } else {
+ new File(url)
+ }
+ if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
+ logWarning("File " + targetFile + " exists and does not match contents of " + url +
+ "; using existing version")
} else {
- // url is not absolute, i.e. itself is the path to the source file.
- logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(url, targetFile.getAbsolutePath)
+ // Remove the file if it already exists
+ targetFile.delete()
+ // Symlink the file locally.
+ if (uri.isAbsolute) {
+ // url is absolute, i.e. it starts with "file:///". Extract the source
+ // file's absolute path from the url.
+ val sourceFile = new File(uri)
+ logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+ FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ } else {
+ // url is not absolute, i.e. itself is the path to the source file.
+ logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
+ FileUtil.symLink(url, targetFile.getAbsolutePath)
+ }
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
@@ -159,8 +179,15 @@ private object Utils extends Logging {
val conf = new Configuration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
- val out = new FileOutputStream(targetFile)
+ val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
+ if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
+ logWarning("File " + targetFile + " exists and does not match contents of " + url +
+ "; using existing version")
+ tempFile.delete()
+ } else {
+ Files.move(tempFile, targetFile)
+ }
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {