diff options
Diffstat (limited to 'core')
3 files changed, 41 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 3d2cabcdfd..050778a895 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -176,26 +176,31 @@ private[deploy] object RPackageUtils extends Logging { val file = new File(Utils.resolveURI(jarPath)) if (file.exists()) { val jar = new JarFile(file) - if (checkManifestForR(jar)) { - print(s"$file contains R source code. Now installing package.", printStream, Level.INFO) - val rSource = extractRFolder(jar, printStream, verbose) - if (RUtils.rPackages.isEmpty) { - RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath) - } - try { - if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) { - print(s"ERROR: Failed to build R package in $file.", printStream) - print(RJarDoc, printStream) + Utils.tryWithSafeFinally { + if (checkManifestForR(jar)) { + print(s"$file contains R source code. Now installing package.", printStream, Level.INFO) + val rSource = extractRFolder(jar, printStream, verbose) + if (RUtils.rPackages.isEmpty) { + RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath) } - } finally { // clean up - if (!rSource.delete()) { - logWarning(s"Error deleting ${rSource.getPath()}") + try { + if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) { + print(s"ERROR: Failed to build R package in $file.", printStream) + print(RJarDoc, printStream) + } + } finally { + // clean up + if (!rSource.delete()) { + logWarning(s"Error deleting ${rSource.getPath()}") + } + } + } else { + if (verbose) { + print(s"$file doesn't contain R source code, skipping...", printStream) } } - } else { - if (verbose) { - print(s"$file doesn't contain R source code, skipping...", printStream) - } + } { + jar.close() } } else { print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING) @@ -231,8 +236,12 @@ private[deploy] object RPackageUtils extends Logging { val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) try { filesToBundle.foreach { file => - // get the relative paths for proper naming in the zip file - val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "") + // Get the relative paths for proper naming in the ZIP file. Note that + // we convert dir to URI to force / and then remove trailing / that show up for + // directories because the separator should always be / for according to ZIP + // specification and therefore `relPath` here should be, for example, + // "/packageTest/def.R" or "/test.R". + val relPath = file.toURI.toString.replaceFirst(dir.toURI.toString.stripSuffix("/"), "") val fis = new FileInputStream(file) val zipEntry = new ZipEntry(relPath) zipOutputStream.putNextEntry(zipEntry) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index f8054f5fd7..a73b300ec2 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -61,7 +61,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext pw.close() // Path to tmpFile - tmpFilePath = "file://" + tmpFile.getAbsolutePath + tmpFilePath = tmpFile.toURI.toString } after { @@ -181,7 +181,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext sc.textFile(tmpFilePath, 4) .map(key => (key, 1)) .reduceByKey(_ + _) - .saveAsTextFile("file://" + tmpFile.getAbsolutePath) + .saveAsTextFile(tmpFile.toURI.toString) sc.listenerBus.waitUntilEmpty(500) assert(inputRead == numRecords) @@ -197,7 +197,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext val numPartitions = 2 val cartVector = 0 to 9 val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt") - val cartFilePath = "file://" + cartFile.getAbsolutePath + val cartFilePath = cartFile.toURI.toString // write files to disk so we can read them later. sc.parallelize(cartVector).saveAsTextFile(cartFilePath) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 230e2c34d0..4c3d0b1021 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -119,19 +119,20 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } test("Event log name") { + val baseDirUri = Utils.resolveURI("/base-dir") // without compression - assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath( - Utils.resolveURI("/base-dir"), "app1", None)) + assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath( + baseDirUri, "app1", None)) // with compression - assert(s"file:/base-dir/app1.lzf" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf"))) + assert(s"${baseDirUri.toString}/app1.lzf" === + EventLoggingListener.getLogPath(baseDirUri, "app1", None, Some("lzf"))) // illegal characters in app ID - assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" === + EventLoggingListener.getLogPath(baseDirUri, "a fine:mind$dollar{bills}.1", None)) // illegal characters in app ID with compression - assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" === + EventLoggingListener.getLogPath(baseDirUri, "a fine:mind$dollar{bills}.1", None, Some("lz4"))) } @@ -289,7 +290,7 @@ object EventLoggingListenerSuite { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") conf.set("spark.eventLog.testing", "true") - conf.set("spark.eventLog.dir", logDir.toString) + conf.set("spark.eventLog.dir", logDir.toUri.toString) compressionCodec.foreach { codec => conf.set("spark.eventLog.compress", "true") conf.set("spark.io.compression.codec", codec) |