aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala19
3 files changed, 41 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index 3d2cabcdfd..050778a895 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -176,26 +176,31 @@ private[deploy] object RPackageUtils extends Logging {
val file = new File(Utils.resolveURI(jarPath))
if (file.exists()) {
val jar = new JarFile(file)
- if (checkManifestForR(jar)) {
- print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
- val rSource = extractRFolder(jar, printStream, verbose)
- if (RUtils.rPackages.isEmpty) {
- RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
- }
- try {
- if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
- print(s"ERROR: Failed to build R package in $file.", printStream)
- print(RJarDoc, printStream)
+ Utils.tryWithSafeFinally {
+ if (checkManifestForR(jar)) {
+ print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
+ val rSource = extractRFolder(jar, printStream, verbose)
+ if (RUtils.rPackages.isEmpty) {
+ RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
}
- } finally { // clean up
- if (!rSource.delete()) {
- logWarning(s"Error deleting ${rSource.getPath()}")
+ try {
+ if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
+ print(s"ERROR: Failed to build R package in $file.", printStream)
+ print(RJarDoc, printStream)
+ }
+ } finally {
+ // clean up
+ if (!rSource.delete()) {
+ logWarning(s"Error deleting ${rSource.getPath()}")
+ }
+ }
+ } else {
+ if (verbose) {
+ print(s"$file doesn't contain R source code, skipping...", printStream)
}
}
- } else {
- if (verbose) {
- print(s"$file doesn't contain R source code, skipping...", printStream)
- }
+ } {
+ jar.close()
}
} else {
print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING)
@@ -231,8 +236,12 @@ private[deploy] object RPackageUtils extends Logging {
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
try {
filesToBundle.foreach { file =>
- // get the relative paths for proper naming in the zip file
- val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "")
+ // Get the relative paths for proper naming in the ZIP file. Note that
+ // we convert dir to URI to force / and then remove trailing / that show up for
+ // directories because the separator should always be / for according to ZIP
+ // specification and therefore `relPath` here should be, for example,
+ // "/packageTest/def.R" or "/test.R".
+ val relPath = file.toURI.toString.replaceFirst(dir.toURI.toString.stripSuffix("/"), "")
val fis = new FileInputStream(file)
val zipEntry = new ZipEntry(relPath)
zipOutputStream.putNextEntry(zipEntry)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index f8054f5fd7..a73b300ec2 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -61,7 +61,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
pw.close()
// Path to tmpFile
- tmpFilePath = "file://" + tmpFile.getAbsolutePath
+ tmpFilePath = tmpFile.toURI.toString
}
after {
@@ -181,7 +181,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.textFile(tmpFilePath, 4)
.map(key => (key, 1))
.reduceByKey(_ + _)
- .saveAsTextFile("file://" + tmpFile.getAbsolutePath)
+ .saveAsTextFile(tmpFile.toURI.toString)
sc.listenerBus.waitUntilEmpty(500)
assert(inputRead == numRecords)
@@ -197,7 +197,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
val numPartitions = 2
val cartVector = 0 to 9
val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
- val cartFilePath = "file://" + cartFile.getAbsolutePath
+ val cartFilePath = cartFile.toURI.toString
// write files to disk so we can read them later.
sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 230e2c34d0..4c3d0b1021 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -119,19 +119,20 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}
test("Event log name") {
+ val baseDirUri = Utils.resolveURI("/base-dir")
// without compression
- assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
- Utils.resolveURI("/base-dir"), "app1", None))
+ assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath(
+ baseDirUri, "app1", None))
// with compression
- assert(s"file:/base-dir/app1.lzf" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
+ assert(s"${baseDirUri.toString}/app1.lzf" ===
+ EventLoggingListener.getLogPath(baseDirUri, "app1", None, Some("lzf")))
// illegal characters in app ID
- assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
+ assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" ===
+ EventLoggingListener.getLogPath(baseDirUri,
"a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
- assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
+ assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" ===
+ EventLoggingListener.getLogPath(baseDirUri,
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
@@ -289,7 +290,7 @@ object EventLoggingListenerSuite {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
- conf.set("spark.eventLog.dir", logDir.toString)
+ conf.set("spark.eventLog.dir", logDir.toUri.toString)
compressionCodec.foreach { codec =>
conf.set("spark.eventLog.compress", "true")
conf.set("spark.io.compression.codec", codec)