aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-28 16:39:49 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-28 16:39:49 -0700
commit2398e3d69c9a675d651c192107953de8e6c2aecd (patch)
treefe0d38664199446c8c8e76c5a4f38196cdbeb329
parent4f4721a21cc9acc2b6f685bbfc8757d29563a775 (diff)
downloadspark-2398e3d69c9a675d651c192107953de8e6c2aecd.tar.gz
spark-2398e3d69c9a675d651c192107953de8e6c2aecd.tar.bz2
spark-2398e3d69c9a675d651c192107953de8e6c2aecd.zip
[SPARK-14836][YARN] Zip all the jars before uploading to distributed cache
## What changes were proposed in this pull request? <copy form JIRA> Currently if neither `spark.yarn.jars` nor `spark.yarn.archive` is set (by default), Spark on yarn code will upload all the jars in the folder separately into distributed cache, this is quite time consuming, and very verbose, instead of upload jars separately into distributed cache, here changes to zip all the jars first, and then put into distributed cache. This will significantly improve the speed of starting time. ## How was this patch tested? Unit test and local integrated test is done. Verified with SparkPi both in spark cluster and client mode. Author: jerryshao <sshao@hortonworks.com> Closes #12597 from jerryshao/SPARK-14836.
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala21
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala11
2 files changed, 23 insertions, 9 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6184ad591c..b494ef0dd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -496,11 +496,26 @@ private[spark] class Client(
"to uploading libraries under SPARK_HOME.")
val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
sparkConf.getenv("SPARK_HOME")))
- jarsDir.listFiles().foreach { f =>
- if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
- distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
+ val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
+ new File(Utils.getLocalDir(sparkConf)))
+ val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+
+ try {
+ jarsStream.setLevel(0)
+ jarsDir.listFiles().foreach { f =>
+ if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) {
+ jarsStream.putNextEntry(new ZipEntry(f.getName))
+ Files.copy(f, jarsStream)
+ jarsStream.closeEntry()
+ }
}
+ } finally {
+ jarsStream.close()
}
+
+ distribute(jarsArchive.toURI.getPath,
+ resType = LocalResourceType.ARCHIVE,
+ destName = Some(LOCALIZED_LIB_DIR))
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index f196a0d8ca..a408c48d1d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -285,8 +285,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
- anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@@ -295,13 +293,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
new FileOutputStream(new File(libs, "RELEASE")).close()
- val userLibs = Utils.createTempDir()
+ val userLib1 = Utils.createTempDir()
+ val userLib2 = Utils.createTempDir()
val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
- val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
+ val jar2 = TestUtils.createJarWithFiles(Map(), userLib1)
// Copy jar2 to jar3 with same name
val jar3 = {
- val target = new File(userLibs, new File(jar1.toURI).getName)
+ val target = new File(userLib2, new File(jar2.toURI).getName)
val input = new FileInputStream(jar2.getPath)
val output = new FileOutputStream(target)
Utils.copyStream(input, output, closeStreams = true)
@@ -315,7 +314,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val tempDir = Utils.createTempDir()
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
- // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
+ // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be
// ignored.
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
}