aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/main
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-03-11 07:54:57 -0600
committerTom Graves <tgraves@yahoo-inc.com>2016-03-11 07:54:57 -0600
commit07f1c5447753a3d593cd6ececfcb03c11b1cf8ff (patch)
tree74c4c9f81e64cc1ddde0b1c5e554a836808609e1 /yarn/src/main
parent8fff0f92a4aca90b62c6e272eabcbb0257ba38d5 (diff)
downloadspark-07f1c5447753a3d593cd6ececfcb03c11b1cf8ff.tar.gz
spark-07f1c5447753a3d593cd6ececfcb03c11b1cf8ff.tar.bz2
spark-07f1c5447753a3d593cd6ececfcb03c11b1cf8ff.zip
[SPARK-13577][YARN] Allow Spark jar to be multiple jars, archive.
In preparation for the demise of assemblies, this change allows the YARN backend to use multiple jars and globs as the "Spark jar". The config option has been renamed to "spark.yarn.jars" to reflect that. A second option "spark.yarn.archive" was also added; if set, this takes precedence and uploads an archive expected to contain the jar files with the Spark code and its dependencies. Existing deployments should keep working, mostly. This change drops support for the "SPARK_JAR" environment variable, and also does not fall back to using "jarOfClass" if no configuration is set, falling back to finding files under SPARK_HOME instead. This should be fine since "jarOfClass" probably wouldn't work unless you were using spark-submit anyway. Tested with the unit tests, and trying the different config options on a YARN cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11500 from vanzin/SPARK-13577.
Diffstat (limited to 'yarn/src/main')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala108
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala10
2 files changed, 86 insertions, 32 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6ca9669002..0b5ceb768c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -423,7 +423,63 @@ private[spark] class Client(
}
/**
- * Copy the given main resource to the distributed cache if the scheme is not "local".
+ * Add Spark to the cache. There are two settings that control what files to add to the cache:
+ * - if a Spark archive is defined, use the archive. The archive is expected to contain
+ * jar files at its root directory.
+ * - if a list of jars is provided, filter the non-local ones, resolve globs, and
+ * add the found files to the cache.
+ *
+ * Note that the archive cannot be a "local" URI. If none of the above settings are found,
+ * then upload all files found in $SPARK_HOME/jars.
+ *
+ * TODO: currently the code looks in $SPARK_HOME/lib while the work to replace assemblies
+ * with a directory full of jars is ongoing.
+ */
+ val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
+ if (sparkArchive.isDefined) {
+ val archive = sparkArchive.get
+ require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
+ distribute(Utils.resolveURI(archive).toString,
+ resType = LocalResourceType.ARCHIVE,
+ destName = Some(LOCALIZED_LIB_DIR))
+ } else {
+ sparkConf.get(SPARK_JARS) match {
+ case Some(jars) =>
+ // Break the list of jars to upload, and resolve globs.
+ val localJars = new ArrayBuffer[String]()
+ jars.foreach { jar =>
+ if (!isLocalUri(jar)) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val pathFs = FileSystem.get(path.toUri(), hadoopConf)
+ pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
+ distribute(entry.getPath().toUri().toString(),
+ targetDir = Some(LOCALIZED_LIB_DIR))
+ }
+ } else {
+ localJars += jar
+ }
+ }
+
+ // Propagate the local URIs to the containers using the configuration.
+ sparkConf.set(SPARK_JARS, localJars)
+
+ case None =>
+ // No configuration, so fall back to uploading local jar files.
+ logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
+ "to uploading libraries under SPARK_HOME.")
+ val jarsDir = new File(sparkConf.getenv("SPARK_HOME"), "lib")
+ if (jarsDir.isDirectory()) {
+ jarsDir.listFiles().foreach { f =>
+ if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
+ distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Copy a few resources to the distributed cache if their scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
* (1) destination resource name,
@@ -431,8 +487,7 @@ private[spark] class Client(
* (3) Spark property key to set if the scheme is not local
*/
List(
- (SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key),
- (APP_JAR_NAME, args.userJar, APP_JAR.key),
+ (APP_JAR_NAME, args.userJar, APP_JAR),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, path, confKey) =>
if (path != null && !path.trim().isEmpty()) {
@@ -1062,8 +1117,7 @@ object Client extends Logging {
new Client(args, sparkConf).run()
}
- // Alias for the Spark assembly jar and the user jar
- val SPARK_JAR_NAME: String = "__spark__.jar"
+ // Alias for the user jar
val APP_JAR_NAME: String = "__app__.jar"
// URI scheme that identifies local resources
@@ -1072,8 +1126,6 @@ object Client extends Logging {
// Staging directory for any temporary jars or files
val SPARK_STAGING: String = ".sparkStaging"
- // Location of any user-defined Spark jars
- val ENV_SPARK_JAR = "SPARK_JAR"
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
@@ -1095,28 +1147,8 @@ object Client extends Logging {
// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"
- /**
- * Find the user-defined Spark jar if configured, or return the jar containing this
- * class if not.
- *
- * This method first looks in the SparkConf object for the spark.yarn.jar key, and in the
- * user environment if that is not found (for backwards compatibility).
- */
- private def sparkJar(conf: SparkConf): String = {
- conf.get(SPARK_JAR).getOrElse(
- if (System.getenv(ENV_SPARK_JAR) != null) {
- logWarning(
- s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
- s"in favor of the ${SPARK_JAR.key} configuration variable.")
- System.getenv(ENV_SPARK_JAR)
- } else {
- SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
- + "find jar containing Spark classes. The jar can be defined using the "
- + s"${SPARK_JAR.key} configuration option. If testing Spark, either set that option "
- + "or make sure SPARK_PREPEND_CLASSES is not set."))
- }
- )
- }
+ // Subdirectory where Spark libraries will be placed.
+ val LOCALIZED_LIB_DIR = "__spark_libs__"
/**
* Return the path to the given application's staging directory.
@@ -1236,7 +1268,18 @@ object Client extends Logging {
addFileToClasspath(sparkConf, conf, x, null, env)
}
}
- addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR_NAME, env)
+
+ // Add the Spark jars to the classpath, depending on how they were distributed.
+ addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ LOCALIZED_LIB_DIR, "*"), env)
+ if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+ sparkConf.get(SPARK_JARS).foreach { jars =>
+ jars.filter(isLocalUri).foreach { jar =>
+ addClasspathEntry(getClusterPath(sparkConf, jar), env)
+ }
+ }
+ }
+
populateHadoopClasspath(conf, env)
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1392,4 +1435,9 @@ object Client extends Logging {
components.mkString(Path.SEPARATOR)
}
+ /** Returns whether the URI is a "local:" URI. */
+ def isLocalUri(uri: String): Boolean = {
+ uri.startsWith(s"$LOCAL_SCHEME:")
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 06c1be9bf0..10cd6d00b0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -72,11 +72,17 @@ package object config {
/* File distribution. */
- private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar")
- .doc("Location of the Spark jar to use.")
+ private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
+ .doc("Location of archive containing jars files with Spark classes.")
.stringConf
.optional
+ private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
+ .doc("Location of jars containing Spark classes.")
+ .stringConf
+ .toSequence
+ .optional
+
private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
.stringConf
.optional