diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-30 16:21:49 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-30 16:21:49 -0700 |
commit | bed1b08169df91e97cb9ebaf8e58daeb655ff55d (patch) | |
tree | c05c0b71d8029561dd7095da030dae70a01acfa1 | |
parent | ef77bb73c66ce938e409cbbac32b67badaa5c57d (diff) | |
download | spark-bed1b08169df91e97cb9ebaf8e58daeb655ff55d.tar.gz spark-bed1b08169df91e97cb9ebaf8e58daeb655ff55d.tar.bz2 spark-bed1b08169df91e97cb9ebaf8e58daeb655ff55d.zip |
Do not create symlink for local add file. Instead, copy the file.
This prevents Spark from changing the original file's permission, and
also allow add file to work on non-posix operating systems.
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 78 |
1 files changed, 39 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 84626df553..ec15326014 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -4,20 +4,26 @@ import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} +import java.util.regex.Pattern + import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.JavaConversions._ import scala.io.Source + import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder + +import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} + import spark.serializer.SerializerInstance import spark.deploy.SparkHadoopUtil -import java.util.regex.Pattern + /** * Various utility methods used by Spark. */ private object Utils extends Logging { + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -68,7 +74,6 @@ private object Utils extends Logging { return buf } - private val shutdownDeletePaths = new collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook @@ -87,19 +92,19 @@ private object Utils extends Logging { } } - // Note: if file is child of some registered path, while not equal to it, then return true; else false - // This is to ensure that two shutdown hooks do not try to delete each others paths - resulting in IOException - // and incomplete cleanup + // Note: if file is child of some registered path, while not equal to it, then return true; + // else false. This is to ensure that two shutdown hooks do not try to delete each others + // paths - resulting in IOException and incomplete cleanup. def hasRootAsShutdownDeleteDir(file: File): Boolean = { - val absolutePath = file.getAbsolutePath() - val retval = shutdownDeletePaths.synchronized { - shutdownDeletePaths.find(path => ! absolutePath.equals(path) && absolutePath.startsWith(path) ).isDefined + shutdownDeletePaths.find { path => + !absolutePath.equals(path) && absolutePath.startsWith(path) + }.isDefined + } + if (retval) { + logInfo("path = " + file + ", already present as root for deletion.") } - - if (retval) logInfo("path = " + file + ", already present as root for deletion.") - retval } @@ -131,7 +136,7 @@ private object Utils extends Logging { if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) } }) - return dir + dir } /** Copy all data from an InputStream to an OutputStream */ @@ -174,35 +179,30 @@ private object Utils extends Logging { Utils.copyStream(in, out, true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { tempFile.delete() - throw new SparkException("File " + targetFile + " exists and does not match contents of" + - " " + url) + throw new SparkException( + "File " + targetFile + " exists and does not match contents of" + " " + url) } else { Files.move(tempFile, targetFile) } case "file" | null => - val sourceFile = if (uri.isAbsolute) { - new File(uri) - } else { - new File(url) - } - if (targetFile.exists && !Files.equal(sourceFile, targetFile)) { - throw new SparkException("File " + targetFile + " exists and does not match contents of" + - " " + url) - } else { - // Remove the file if it already exists - targetFile.delete() - // Symlink the file locally. - if (uri.isAbsolute) { - // url is absolute, i.e. it starts with "file:///". Extract the source - // file's absolute path from the url. - val sourceFile = new File(uri) - logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) - FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + // In the case of a local file, copy the local file to the target directory. + // Note the difference between uri vs url. + val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url) + if (targetFile.exists) { + // If the target file already exists, warn the user if + if (!Files.equal(sourceFile, targetFile)) { + throw new SparkException( + "File " + targetFile + " exists and does not match contents of" + " " + url) } else { - // url is not absolute, i.e. itself is the path to the source file. - logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) - FileUtil.symLink(url, targetFile.getAbsolutePath) + // Do nothing if the file contents are the same, i.e. this file has been copied + // previously. + logInfo(sourceFile.getAbsolutePath + " has been previously copied to " + + targetFile.getAbsolutePath) } + } else { + // The file does not exist in the target directory. Copy it there. + logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) + Files.copy(sourceFile, targetFile) } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others @@ -323,8 +323,6 @@ private object Utils extends Logging { InetAddress.getByName(address).getHostName } - - def localHostPort(): String = { val retval = System.getProperty("spark.hostPort", null) if (retval == null) { @@ -382,6 +380,7 @@ private object Utils extends Logging { // Typically, this will be of order of number of nodes in cluster // If not, we should change it to LRUCache or something. private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() + def parseHostPort(hostPort: String): (String, Int) = { { // Check cache first. @@ -390,7 +389,8 @@ private object Utils extends Logging { } val indx: Int = hostPort.lastIndexOf(':') - // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... but then hadoop does not support ipv6 right now. + // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... + // but then hadoop does not support ipv6 right now. // For now, we assume that if port exists, then it is valid - not check if it is an int > 0 if (-1 == indx) { val retval = (hostPort, 0) |