aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-10-07 00:54:38 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2012-10-07 00:54:38 -0700
commit80f59e17e2dc7b65026a5a16f2dff65e01964212 (patch)
treebac9b00e9a69885652b4f023915a0a63ba9fb3e5 /core
parenteca570f66a3d93bc1745a9fcf8410ad0a9db3e64 (diff)
downloadspark-80f59e17e2dc7b65026a5a16f2dff65e01964212.tar.gz
spark-80f59e17e2dc7b65026a5a16f2dff65e01964212.tar.bz2
spark-80f59e17e2dc7b65026a5a16f2dff65e01964212.zip
Fixed a bug in addFile that if the file is specified as "file:///", the
symlink is created wrong for local mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Utils.scala27
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala31
2 files changed, 40 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index a480fe046d..a7ef22e81b 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -71,7 +71,7 @@ private object Utils extends Logging {
while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
- throw new IOException("Failed to create a temp directory after " + maxAttempts +
+ throw new IOException("Failed to create a temp directory after " + maxAttempts +
" attempts!")
}
try {
@@ -122,7 +122,7 @@ private object Utils extends Logging {
val out = new FileOutputStream(localPath)
Utils.copyStream(in, out, true)
}
-
+
/**
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
@@ -140,9 +140,18 @@ private object Utils extends Logging {
case "file" | null =>
// Remove the file if it already exists
targetFile.delete()
- // Symlink the file locally
- logInfo("Symlinking " + url + " to " + targetFile)
- FileUtil.symLink(url, targetFile.toString)
+ // Symlink the file locally.
+ if (uri.isAbsolute) {
+ // url is absolute, i.e. it starts with "file:///". Extract the source
+ // file's absolute path from the url.
+ val sourceFile = new File(uri)
+ println("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+ FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ } else {
+ // url is not absolute, i.e. itself is the path to the source file.
+ logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
+ FileUtil.symLink(url, targetFile.getAbsolutePath)
+ }
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val uri = new URI(url)
@@ -208,7 +217,7 @@ private object Utils extends Logging {
def localHostName(): String = {
customHostname.getOrElse(InetAddress.getLocalHost.getHostName)
}
-
+
/**
* Returns a standard ThreadFactory except all threads are daemons.
*/
@@ -232,10 +241,10 @@ private object Utils extends Logging {
return threadPool
}
-
+
/**
- * Return the string to tell how long has passed in seconds. The passing parameter should be in
- * millisecond.
+ * Return the string to tell how long has passed in seconds. The passing parameter should be in
+ * millisecond.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
return " " + (System.currentTimeMillis - startTimeMs) + " ms "
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index a25b61dcbd..b4283d9604 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -7,11 +7,11 @@ import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
class FileServerSuite extends FunSuite with BeforeAndAfter {
-
+
@transient var sc: SparkContext = _
@transient var tmpFile : File = _
@transient var testJarFile : File = _
-
+
before {
// Create a sample text file
val tmpdir = new File(Files.createTempDir(), "test")
@@ -21,7 +21,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
pw.println("100")
pw.close()
}
-
+
after {
if (sc != null) {
sc.stop()
@@ -34,7 +34,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
-
+
test("Distributing files locally") {
sc = new SparkContext("local[4]", "test")
sc.addFile(tmpFile.toString)
@@ -45,16 +45,29 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
in.close()
_ * fileVal + _ * fileVal
}.collect
- println(result)
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test("Distributing files locally using URL as input") {
+ // addFile("file:///....")
+ sc = new SparkContext("local[4]", "test")
+ sc.addFile((new File(tmpFile.toString)).toURL.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
- val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
- val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
@@ -81,10 +94,10 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
- val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
- val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])