aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-30 16:21:49 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-30 16:21:49 -0700
commitbed1b08169df91e97cb9ebaf8e58daeb655ff55d (patch)
treec05c0b71d8029561dd7095da030dae70a01acfa1
parentef77bb73c66ce938e409cbbac32b67badaa5c57d (diff)
downloadspark-bed1b08169df91e97cb9ebaf8e58daeb655ff55d.tar.gz
spark-bed1b08169df91e97cb9ebaf8e58daeb655ff55d.tar.bz2
spark-bed1b08169df91e97cb9ebaf8e58daeb655ff55d.zip
Do not create symlink for local add file. Instead, copy the file.
This prevents Spark from changing the original file's permission, and also allow add file to work on non-posix operating systems.
-rw-r--r--core/src/main/scala/spark/Utils.scala78
1 files changed, 39 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 84626df553..ec15326014 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -4,20 +4,26 @@ import java.io._
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import java.util.regex.Pattern
+
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._
import scala.io.Source
+
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+
import spark.serializer.SerializerInstance
import spark.deploy.SparkHadoopUtil
-import java.util.regex.Pattern
+
/**
* Various utility methods used by Spark.
*/
private object Utils extends Logging {
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -68,7 +74,6 @@ private object Utils extends Logging {
return buf
}
-
private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook
@@ -87,19 +92,19 @@ private object Utils extends Logging {
}
}
- // Note: if file is child of some registered path, while not equal to it, then return true; else false
- // This is to ensure that two shutdown hooks do not try to delete each others paths - resulting in IOException
- // and incomplete cleanup
+ // Note: if file is child of some registered path, while not equal to it, then return true;
+ // else false. This is to ensure that two shutdown hooks do not try to delete each others
+ // paths - resulting in IOException and incomplete cleanup.
def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-
val absolutePath = file.getAbsolutePath()
-
val retval = shutdownDeletePaths.synchronized {
- shutdownDeletePaths.find(path => ! absolutePath.equals(path) && absolutePath.startsWith(path) ).isDefined
+ shutdownDeletePaths.find { path =>
+ !absolutePath.equals(path) && absolutePath.startsWith(path)
+ }.isDefined
+ }
+ if (retval) {
+ logInfo("path = " + file + ", already present as root for deletion.")
}
-
- if (retval) logInfo("path = " + file + ", already present as root for deletion.")
-
retval
}
@@ -131,7 +136,7 @@ private object Utils extends Logging {
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
}
})
- return dir
+ dir
}
/** Copy all data from an InputStream to an OutputStream */
@@ -174,35 +179,30 @@ private object Utils extends Logging {
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete()
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
} else {
Files.move(tempFile, targetFile)
}
case "file" | null =>
- val sourceFile = if (uri.isAbsolute) {
- new File(uri)
- } else {
- new File(url)
- }
- if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
- } else {
- // Remove the file if it already exists
- targetFile.delete()
- // Symlink the file locally.
- if (uri.isAbsolute) {
- // url is absolute, i.e. it starts with "file:///". Extract the source
- // file's absolute path from the url.
- val sourceFile = new File(uri)
- logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ // In the case of a local file, copy the local file to the target directory.
+ // Note the difference between uri vs url.
+ val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+ if (targetFile.exists) {
+ // If the target file already exists, warn the user if
+ if (!Files.equal(sourceFile, targetFile)) {
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
} else {
- // url is not absolute, i.e. itself is the path to the source file.
- logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(url, targetFile.getAbsolutePath)
+ // Do nothing if the file contents are the same, i.e. this file has been copied
+ // previously.
+ logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ + targetFile.getAbsolutePath)
}
+ } else {
+ // The file does not exist in the target directory. Copy it there.
+ logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+ Files.copy(sourceFile, targetFile)
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
@@ -323,8 +323,6 @@ private object Utils extends Logging {
InetAddress.getByName(address).getHostName
}
-
-
def localHostPort(): String = {
val retval = System.getProperty("spark.hostPort", null)
if (retval == null) {
@@ -382,6 +380,7 @@ private object Utils extends Logging {
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
+
def parseHostPort(hostPort: String): (String, Int) = {
{
// Check cache first.
@@ -390,7 +389,8 @@ private object Utils extends Logging {
}
val indx: Int = hostPort.lastIndexOf(':')
- // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... but then hadoop does not support ipv6 right now.
+ // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
+ // but then hadoop does not support ipv6 right now.
// For now, we assume that if port exists, then it is valid - not check if it is an int > 0
if (-1 == indx) {
val retval = (hostPort, 0)