aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala21
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala11
2 files changed, 23 insertions, 9 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6184ad591c..b494ef0dd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -496,11 +496,26 @@ private[spark] class Client(
"to uploading libraries under SPARK_HOME.")
val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
sparkConf.getenv("SPARK_HOME")))
- jarsDir.listFiles().foreach { f =>
- if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
- distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
+ val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
+ new File(Utils.getLocalDir(sparkConf)))
+ val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+
+ try {
+ jarsStream.setLevel(0)
+ jarsDir.listFiles().foreach { f =>
+ if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) {
+ jarsStream.putNextEntry(new ZipEntry(f.getName))
+ Files.copy(f, jarsStream)
+ jarsStream.closeEntry()
+ }
}
+ } finally {
+ jarsStream.close()
}
+
+ distribute(jarsArchive.toURI.getPath,
+ resType = LocalResourceType.ARCHIVE,
+ destName = Some(LOCALIZED_LIB_DIR))
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index f196a0d8ca..a408c48d1d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -285,8 +285,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
- anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@@ -295,13 +293,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
new FileOutputStream(new File(libs, "RELEASE")).close()
- val userLibs = Utils.createTempDir()
+ val userLib1 = Utils.createTempDir()
+ val userLib2 = Utils.createTempDir()
val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
- val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
+ val jar2 = TestUtils.createJarWithFiles(Map(), userLib1)
// Copy jar2 to jar3 with same name
val jar3 = {
- val target = new File(userLibs, new File(jar1.toURI).getName)
+ val target = new File(userLib2, new File(jar2.toURI).getName)
val input = new FileInputStream(jar2.getPath)
val output = new FileOutputStream(target)
Utils.copyStream(input, output, closeStreams = true)
@@ -315,7 +314,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val tempDir = Utils.createTempDir()
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
- // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
+ // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be
// ignored.
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
}