aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala71
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala7
3 files changed, 31 insertions, 50 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7b29c1ae4d..f0f13a16e0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -351,14 +351,6 @@ private[spark] class Client(
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
- if (oldLog4jConf.isDefined) {
- logWarning(
- "SPARK_LOG4J_CONF detected in the system environment. This variable has been " +
- "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " +
- "for alternatives.")
- }
-
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -479,25 +471,16 @@ private[spark] class Client(
}
/**
- * Copy a few resources to the distributed cache if their scheme is not "local".
+ * Copy user jar to the distributed cache if their scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
- * Each resource is represented by a 3-tuple of:
- * (1) destination resource name,
- * (2) local path to the resource,
- * (3) Spark property key to set if the scheme is not local
*/
- List(
- (APP_JAR_NAME, args.userJar, APP_JAR),
- ("log4j.properties", oldLog4jConf.orNull, null)
- ).foreach { case (destName, path, confKey) =>
- if (path != null && !path.trim().isEmpty()) {
- val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
- if (isLocal && confKey != null) {
- require(localizedPath != null, s"Path $path already distributed.")
- // If the resource is intended for local use only, handle this downstream
- // by setting the appropriate property
- sparkConf.set(confKey, localizedPath)
- }
+ Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
+ val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
+ if (isLocal) {
+ require(localizedPath != null, s"Path $jar already distributed")
+ // If the resource is intended for local use only, handle this downstream
+ // by setting the appropriate property
+ sparkConf.set(APP_JAR, localizedPath)
}
}
@@ -541,11 +524,10 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}
- // Distribute an archive with Hadoop and Spark configuration for the AM.
+ // Distribute an archive with Hadoop and Spark configuration for the AM and executors.
val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
- destName = Some(LOCALIZED_CONF_DIR),
- appMasterOnly = true)
+ destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)
localResources
@@ -554,10 +536,10 @@ private[spark] class Client(
/**
* Create an archive with the config files for distribution.
*
- * These are only used by the AM, since executors will use the configuration object broadcast by
- * the driver. The files are zipped and added to the job as an archive, so that YARN will explode
- * it when distributing to the AM. This directory is then added to the classpath of the AM
- * process, just to make sure that everybody is using the same default config.
+ * These will be used by AM and executors. The files are zipped and added to the job as an
+ * archive, so that YARN will explode it when distributing to AM and executors. This directory
+ * is then added to the classpath of AM and executor process, just to make sure that everybody
+ * is using the same default config.
*
* This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
* shows up in the classpath before YARN_CONF_DIR.
@@ -576,11 +558,14 @@ private[spark] class Client(
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.
- val log4jFileName = "log4j.properties"
- Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { url =>
- if (url.getProtocol == "file") {
- hadoopConfFiles(log4jFileName) = new File(url.getPath)
- }
+
+ // Also uploading metrics.properties to distributed cache if exists in classpath.
+ // If user specify this file using --files then executors will use the one
+ // from --files instead.
+ for { prop <- Seq("log4j.properties", "metrics.properties")
+ url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
+ if url.getProtocol == "file" } {
+ hadoopConfFiles(prop) = new File(url.getPath)
}
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
@@ -659,7 +644,7 @@ private[spark] class Client(
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
- populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH))
+ populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -1236,18 +1221,16 @@ object Client extends Logging {
conf: Configuration,
sparkConf: SparkConf,
env: HashMap[String, String],
- isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
+
addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
- if (isAM) {
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
- LOCALIZED_CONF_DIR, env)
- }
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+ LOCALIZED_CONF_DIR, env)
if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
// in order to properly add the app jar when user classpath is first
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index f956a4d1d5..7b55d781f8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable(
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(null, yarnConf, sparkConf, env, false,
- sparkConf.get(EXECUTOR_CLASS_PATH))
+ Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index e3613a93ed..64723c361c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- populateClasspath(args, conf, sparkConf, env, true)
+ populateClasspath(args, conf, sparkConf, env)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
"/remotePath/1:/remotePath/2")
val env = new MutableHashMap[String, String]()
- populateClasspath(null, conf, sparkConf, env, false,
- extraClassPath = Some("/localPath/my1.jar"))
+ populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
cp should contain ("/remotePath/my1.jar")
@@ -356,7 +355,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private def classpath(client: Client): Array[String] = {
val env = new MutableHashMap[String, String]()
- populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+ populateClasspath(null, client.hadoopConf, client.sparkConf, env)
classpath(env)
}