aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-03-04 12:58:39 -0800
committerAndrew Or <andrew@databricks.com>2015-03-04 12:58:39 -0800
commit3a35a0dfe940843c3f3a5f51acfe24def488faa9 (patch)
treeddc6dace0de76c7b1a1ee9cb459b0f2ada80222c
parentf6773edce05300faf1e673ea2d1782dfb9b8b998 (diff)
downloadspark-3a35a0dfe940843c3f3a5f51acfe24def488faa9.tar.gz
spark-3a35a0dfe940843c3f3a5f51acfe24def488faa9.tar.bz2
spark-3a35a0dfe940843c3f3a5f51acfe24def488faa9.zip
[SPARK-6144] [core] Fix addFile when source files are on "hdfs:"
The code failed in two modes: it complained when it tried to re-create a directory that already existed, and it was placing some files in the wrong parent directory. The patch fixes both issues. Author: Marcelo Vanzin <vanzin@cloudera.com> Author: trystanleftwich <trystan@atscale.com> Closes #4894 from vanzin/SPARK-6144 and squashes the following commits: 100b3a1 [Marcelo Vanzin] Style fix. 58266aa [Marcelo Vanzin] Fix fetchHcfs file for directories. 91733b7 [trystanleftwich] [SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced jar will fail
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala85
2 files changed, 63 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4644088f19..d3dc1d09cb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -624,7 +624,8 @@ private[spark] object Utils extends Logging {
case _ =>
val fs = getHadoopFileSystem(uri, hadoopConf)
val path = new Path(uri)
- fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
+ fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
+ filename = Some(filename))
}
}
@@ -639,19 +640,22 @@ private[spark] object Utils extends Logging {
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
- fileOverwrite: Boolean): Unit = {
- if (!targetDir.mkdir()) {
+ fileOverwrite: Boolean,
+ filename: Option[String] = None): Unit = {
+ if (!targetDir.exists() && !targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
- fs.listStatus(path).foreach { fileStatus =>
- val innerPath = fileStatus.getPath
- if (fileStatus.isDir) {
- fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
- fileOverwrite)
- } else {
- val in = fs.open(innerPath)
- val targetFile = new File(targetDir, innerPath.getName)
- downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
+ val dest = new File(targetDir, filename.getOrElse(path.getName))
+ if (fs.isFile(path)) {
+ val in = fs.open(path)
+ try {
+ downloadFile(path.toString, in, dest, fileOverwrite)
+ } finally {
+ in.close()
+ }
+ } else {
+ fs.listStatus(path).foreach { fileStatus =>
+ fetchHcfsFile(fileStatus.getPath(), dest, fs, conf, hadoopConf, fileOverwrite)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fe2b644251..fd77753c0d 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -208,18 +208,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
// although child1 is old, child2 is still new so return true
- assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
child2.setLastModified(System.currentTimeMillis - (1000 * 30))
- assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
parent.setLastModified(System.currentTimeMillis - (1000 * 30))
// although parent and its immediate children are new, child3 is still old
// we expect a full recursive search for new files.
- assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
child3.setLastModified(System.currentTimeMillis - (1000 * 30))
- assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+ assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}
test("resolveURI") {
@@ -339,21 +339,21 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(!tempDir1.exists())
val tempDir2 = Utils.createTempDir()
- val tempFile1 = new File(tempDir2, "foo.txt")
- Files.touch(tempFile1)
- assert(tempFile1.exists())
- Utils.deleteRecursively(tempFile1)
- assert(!tempFile1.exists())
+ val sourceFile1 = new File(tempDir2, "foo.txt")
+ Files.touch(sourceFile1)
+ assert(sourceFile1.exists())
+ Utils.deleteRecursively(sourceFile1)
+ assert(!sourceFile1.exists())
val tempDir3 = new File(tempDir2, "subdir")
assert(tempDir3.mkdir())
- val tempFile2 = new File(tempDir3, "bar.txt")
- Files.touch(tempFile2)
- assert(tempFile2.exists())
+ val sourceFile2 = new File(tempDir3, "bar.txt")
+ Files.touch(sourceFile2)
+ assert(sourceFile2.exists())
Utils.deleteRecursively(tempDir2)
assert(!tempDir2.exists())
assert(!tempDir3.exists())
- assert(!tempFile2.exists())
+ assert(!sourceFile2.exists())
}
test("loading properties from file") {
@@ -386,30 +386,39 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}
test("fetch hcfs dir") {
- val tempDir = Utils.createTempDir()
- val innerTempDir = Utils.createTempDir(tempDir.getPath)
- val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
- val targetDir = new File("target-dir")
- Files.write("some text", tempFile, UTF_8)
-
- try {
- val path = new Path("file://" + tempDir.getAbsolutePath)
- val conf = new Configuration()
- val fs = Utils.getHadoopFileSystem(path.toString, conf)
- Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
- assert(targetDir.exists())
- assert(targetDir.isDirectory())
- val newInnerDir = new File(targetDir, innerTempDir.getName)
- println("inner temp dir: " + innerTempDir.getName)
- targetDir.listFiles().map(_.getName).foreach(println)
- assert(newInnerDir.exists())
- assert(newInnerDir.isDirectory())
- val newInnerFile = new File(newInnerDir, tempFile.getName)
- assert(newInnerFile.exists())
- assert(newInnerFile.isFile())
- } finally {
- Utils.deleteRecursively(tempDir)
- Utils.deleteRecursively(targetDir)
- }
+ val sourceDir = Utils.createTempDir()
+ val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
+ val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
+ val targetDir = new File(Utils.createTempDir(), "target-dir")
+ Files.write("some text", sourceFile, UTF_8)
+
+ val path = new Path("file://" + sourceDir.getAbsolutePath)
+ val conf = new Configuration()
+ val fs = Utils.getHadoopFileSystem(path.toString, conf)
+
+ assert(!targetDir.isDirectory())
+ Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+ assert(targetDir.isDirectory())
+
+ // Copy again to make sure it doesn't error if the dir already exists.
+ Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+
+ val destDir = new File(targetDir, sourceDir.getName())
+ assert(destDir.isDirectory())
+
+ val destInnerDir = new File(destDir, innerSourceDir.getName)
+ assert(destInnerDir.isDirectory())
+
+ val destInnerFile = new File(destInnerDir, sourceFile.getName)
+ assert(destInnerFile.isFile())
+
+ val filePath = new Path("file://" + sourceFile.getAbsolutePath)
+ val testFileDir = new File("test-filename")
+ val testFileName = "testFName"
+ val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
+ Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
+ conf, false, Some(testFileName))
+ val newFileName = new File(testFileDir, testFileName)
+ assert(newFileName.isFile())
}
}