aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-03-31 10:27:33 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-03-31 10:27:33 -0700
commit3b3cc76004438a942ecea752db39f3a904a52462 (patch)
tree7e787aa82c5af237445c383410e010ad70903f07 /yarn
parent208fff3ac87f200fd4e6f0407d70bf81cf8c556f (diff)
downloadspark-3b3cc76004438a942ecea752db39f3a904a52462.tar.gz
spark-3b3cc76004438a942ecea752db39f3a904a52462.tar.bz2
spark-3b3cc76004438a942ecea752db39f3a904a52462.zip
[SPARK-14062][YARN] Fix log4j and upload metrics.properties automatically with distributed cache
## What changes were proposed in this pull request? 1. Currently log4j which uses distributed cache only adds to AM's classpath, not executor's, this is introduced in #9118, which breaks the original meaning of that PR, so here add log4j file to the classpath of both AM and executors. 2. Automatically upload metrics.properties to distributed cache, so that it could be used by remote driver and executors implicitly. ## How was this patch tested? Unit test and integration test is done. Author: jerryshao <sshao@hortonworks.com> Closes #11885 from jerryshao/SPARK-14062.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala71
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala7
3 files changed, 31 insertions, 50 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7b29c1ae4d..f0f13a16e0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -351,14 +351,6 @@ private[spark] class Client(
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
- if (oldLog4jConf.isDefined) {
- logWarning(
- "SPARK_LOG4J_CONF detected in the system environment. This variable has been " +
- "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " +
- "for alternatives.")
- }
-
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -479,25 +471,16 @@ private[spark] class Client(
}
/**
- * Copy a few resources to the distributed cache if their scheme is not "local".
+ * Copy user jar to the distributed cache if their scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
- * Each resource is represented by a 3-tuple of:
- * (1) destination resource name,
- * (2) local path to the resource,
- * (3) Spark property key to set if the scheme is not local
*/
- List(
- (APP_JAR_NAME, args.userJar, APP_JAR),
- ("log4j.properties", oldLog4jConf.orNull, null)
- ).foreach { case (destName, path, confKey) =>
- if (path != null && !path.trim().isEmpty()) {
- val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
- if (isLocal && confKey != null) {
- require(localizedPath != null, s"Path $path already distributed.")
- // If the resource is intended for local use only, handle this downstream
- // by setting the appropriate property
- sparkConf.set(confKey, localizedPath)
- }
+ Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
+ val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
+ if (isLocal) {
+ require(localizedPath != null, s"Path $jar already distributed")
+ // If the resource is intended for local use only, handle this downstream
+ // by setting the appropriate property
+ sparkConf.set(APP_JAR, localizedPath)
}
}
@@ -541,11 +524,10 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}
- // Distribute an archive with Hadoop and Spark configuration for the AM.
+ // Distribute an archive with Hadoop and Spark configuration for the AM and executors.
val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
- destName = Some(LOCALIZED_CONF_DIR),
- appMasterOnly = true)
+ destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)
localResources
@@ -554,10 +536,10 @@ private[spark] class Client(
/**
* Create an archive with the config files for distribution.
*
- * These are only used by the AM, since executors will use the configuration object broadcast by
- * the driver. The files are zipped and added to the job as an archive, so that YARN will explode
- * it when distributing to the AM. This directory is then added to the classpath of the AM
- * process, just to make sure that everybody is using the same default config.
+ * These will be used by AM and executors. The files are zipped and added to the job as an
+ * archive, so that YARN will explode it when distributing to AM and executors. This directory
+ * is then added to the classpath of AM and executor process, just to make sure that everybody
+ * is using the same default config.
*
* This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
* shows up in the classpath before YARN_CONF_DIR.
@@ -576,11 +558,14 @@ private[spark] class Client(
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.
- val log4jFileName = "log4j.properties"
- Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { url =>
- if (url.getProtocol == "file") {
- hadoopConfFiles(log4jFileName) = new File(url.getPath)
- }
+
+ // Also uploading metrics.properties to distributed cache if exists in classpath.
+ // If user specify this file using --files then executors will use the one
+ // from --files instead.
+ for { prop <- Seq("log4j.properties", "metrics.properties")
+ url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
+ if url.getProtocol == "file" } {
+ hadoopConfFiles(prop) = new File(url.getPath)
}
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
@@ -659,7 +644,7 @@ private[spark] class Client(
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
- populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH))
+ populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -1236,18 +1221,16 @@ object Client extends Logging {
conf: Configuration,
sparkConf: SparkConf,
env: HashMap[String, String],
- isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
+
addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
- if (isAM) {
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
- LOCALIZED_CONF_DIR, env)
- }
+ addClasspathEntry(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+ LOCALIZED_CONF_DIR, env)
if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
// in order to properly add the app jar when user classpath is first
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index f956a4d1d5..7b55d781f8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable(
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
- Client.populateClasspath(null, yarnConf, sparkConf, env, false,
- sparkConf.get(EXECUTOR_CLASS_PATH))
+ Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index e3613a93ed..64723c361c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- populateClasspath(args, conf, sparkConf, env, true)
+ populateClasspath(args, conf, sparkConf, env)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
"/remotePath/1:/remotePath/2")
val env = new MutableHashMap[String, String]()
- populateClasspath(null, conf, sparkConf, env, false,
- extraClassPath = Some("/localPath/my1.jar"))
+ populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
cp should contain ("/remotePath/my1.jar")
@@ -356,7 +355,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private def classpath(client: Client): Array[String] = {
val env = new MutableHashMap[String, String]()
- populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+ populateClasspath(null, client.hadoopConf, client.sparkConf, env)
classpath(env)
}