aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 17:37:13 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-28 17:37:13 -0800
commit397e67103c18ba22c8c63e9692f0096cd0094797 (patch)
tree498230745f557b623f67d96506e9dafc4d9e9194 /core
parentd64fa72d2e4a8290d15e65459337f544e55b3b48 (diff)
downloadspark-397e67103c18ba22c8c63e9692f0096cd0094797.tar.gz
spark-397e67103c18ba22c8c63e9692f0096cd0094797.tar.bz2
spark-397e67103c18ba22c8c63e9692f0096cd0094797.zip
Change Utils.fetchFile() warning to SparkException.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Utils.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala2
2 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index c10b415a93..0e7007459d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -128,6 +128,9 @@ private object Utils extends Logging {
/**
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
+ *
+ * Throws SparkException if the target file already exists and has different contents than
+ * the requested file.
*/
def fetchFile(url: String, targetDir: File) {
val filename = url.split("/").last
@@ -142,9 +145,9 @@ private object Utils extends Logging {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- logWarning("File " + targetFile + " exists and does not match contents of " + url +
- "; using existing version")
tempFile.delete()
+ throw new SparkException("File " + targetFile + " exists and does not match contents of" +
+ " " + url)
} else {
Files.move(tempFile, targetFile)
}
@@ -155,8 +158,8 @@ private object Utils extends Logging {
new File(url)
}
if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
- logWarning("File " + targetFile + " exists and does not match contents of " + url +
- "; using existing version")
+ throw new SparkException("File " + targetFile + " exists and does not match contents of" +
+ " " + url)
} else {
// Remove the file if it already exists
targetFile.delete()
@@ -182,9 +185,9 @@ private object Utils extends Logging {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- logWarning("File " + targetFile + " exists and does not match contents of " + url +
- "; using existing version")
tempFile.delete()
+ throw new SparkException("File " + targetFile + " exists and does not match contents of" +
+ " " + url)
} else {
Files.move(tempFile, targetFile)
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 5d927efb65..2593c0e3a0 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -108,7 +108,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
- this.synchronized {
+ synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)