From 80f59e17e2dc7b65026a5a16f2dff65e01964212 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 7 Oct 2012 00:54:38 -0700 Subject: Fixed a bug in addFile that if the file is specified as "file:///", the symlink is created wrong for local mode. --- core/src/main/scala/spark/Utils.scala | 27 ++++++++++++++------- core/src/test/scala/spark/FileServerSuite.scala | 31 ++++++++++++++++++------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index a480fe046d..a7ef22e81b 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -71,7 +71,7 @@ private object Utils extends Logging { while (dir == null) { attempts += 1 if (attempts > maxAttempts) { - throw new IOException("Failed to create a temp directory after " + maxAttempts + + throw new IOException("Failed to create a temp directory after " + maxAttempts + " attempts!") } try { @@ -122,7 +122,7 @@ private object Utils extends Logging { val out = new FileOutputStream(localPath) Utils.copyStream(in, out, true) } - + /** * Download a file requested by the executor. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. @@ -140,9 +140,18 @@ private object Utils extends Logging { case "file" | null => // Remove the file if it already exists targetFile.delete() - // Symlink the file locally - logInfo("Symlinking " + url + " to " + targetFile) - FileUtil.symLink(url, targetFile.toString) + // Symlink the file locally. + if (uri.isAbsolute) { + // url is absolute, i.e. it starts with "file:///". Extract the source + // file's absolute path from the url. + val sourceFile = new File(uri) + println("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + } else { + // url is not absolute, i.e. itself is the path to the source file. + logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(url, targetFile.getAbsolutePath) + } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val uri = new URI(url) @@ -208,7 +217,7 @@ private object Utils extends Logging { def localHostName(): String = { customHostname.getOrElse(InetAddress.getLocalHost.getHostName) } - + /** * Returns a standard ThreadFactory except all threads are daemons. */ @@ -232,10 +241,10 @@ private object Utils extends Logging { return threadPool } - + /** - * Return the string to tell how long has passed in seconds. The passing parameter should be in - * millisecond. + * Return the string to tell how long has passed in seconds. The passing parameter should be in + * millisecond. */ def getUsedTimeMs(startTimeMs: Long): String = { return " " + (System.currentTimeMillis - startTimeMs) + " ms " diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index a25b61dcbd..b4283d9604 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -7,11 +7,11 @@ import java.io.{File, PrintWriter, FileReader, BufferedReader} import SparkContext._ class FileServerSuite extends FunSuite with BeforeAndAfter { - + @transient var sc: SparkContext = _ @transient var tmpFile : File = _ @transient var testJarFile : File = _ - + before { // Create a sample text file val tmpdir = new File(Files.createTempDir(), "test") @@ -21,7 +21,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { pw.println("100") pw.close() } - + after { if (sc != null) { sc.stop() @@ -34,7 +34,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - + test("Distributing files locally") { sc = new SparkContext("local[4]", "test") sc.addFile(tmpFile.toString) @@ -45,16 +45,29 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { in.close() _ * fileVal + _ * fileVal }.collect - println(result) + assert(result.toSet === Set((1,200), (2,300), (3,500))) + } + + test("Distributing files locally using URL as input") { + // addFile("file:///....") + sc = new SparkContext("local[4]", "test") + sc.addFile((new File(tmpFile.toString)).toURL.toString) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = sc.parallelize(testData).reduceByKey { + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect assert(result.toSet === Set((1,200), (2,300), (3,500))) } test ("Dynamically adding JARS locally") { sc = new SparkContext("local[4]", "test") - val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => + val result = sc.parallelize(testData).reduceByKey { (x,y) => val fac = Thread.currentThread.getContextClassLoader() .loadClass("org.uncommons.maths.Maths") .getDeclaredMethod("factorial", classOf[Int]) @@ -81,10 +94,10 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => + val result = sc.parallelize(testData).reduceByKey { (x,y) => val fac = Thread.currentThread.getContextClassLoader() .loadClass("org.uncommons.maths.Maths") .getDeclaredMethod("factorial", classOf[Int]) -- cgit v1.2.3