From 3b3cc76004438a942ecea752db39f3a904a52462 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 31 Mar 2016 10:27:33 -0700 Subject: [SPARK-14062][YARN] Fix log4j and upload metrics.properties automatically with distributed cache ## What changes were proposed in this pull request? 1. Currently log4j which uses distributed cache only adds to AM's classpath, not executor's, this is introduced in #9118, which breaks the original meaning of that PR, so here add log4j file to the classpath of both AM and executors. 2. Automatically upload metrics.properties to distributed cache, so that it could be used by remote driver and executors implicitly. ## How was this patch tested? Unit test and integration test is done. Author: jerryshao Closes #11885 from jerryshao/SPARK-14062. --- .../org/apache/spark/deploy/yarn/Client.scala | 71 ++++++++-------------- .../spark/deploy/yarn/ExecutorRunnable.scala | 3 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 7 +-- 3 files changed, 31 insertions(+), 50 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7b29c1ae4d..f0f13a16e0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -351,14 +351,6 @@ private[spark] class Client( val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) - if (oldLog4jConf.isDefined) { - logWarning( - "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + - "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + - "for alternatives.") - } - def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { @@ -479,25 +471,16 @@ private[spark] class Client( } /** - * Copy a few resources to the distributed cache if their scheme is not "local". + * Copy user jar to the distributed cache if their scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 3-tuple of: - * (1) destination resource name, - * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local */ - List( - (APP_JAR_NAME, args.userJar, APP_JAR), - ("log4j.properties", oldLog4jConf.orNull, null) - ).foreach { case (destName, path, confKey) => - if (path != null && !path.trim().isEmpty()) { - val (isLocal, localizedPath) = distribute(path, destName = Some(destName)) - if (isLocal && confKey != null) { - require(localizedPath != null, s"Path $path already distributed.") - // If the resource is intended for local use only, handle this downstream - // by setting the appropriate property - sparkConf.set(confKey, localizedPath) - } + Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar => + val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME)) + if (isLocal) { + require(localizedPath != null, s"Path $jar already distributed") + // If the resource is intended for local use only, handle this downstream + // by setting the appropriate property + sparkConf.set(APP_JAR, localizedPath) } } @@ -541,11 +524,10 @@ private[spark] class Client( distribute(f, targetDir = targetDir) } - // Distribute an archive with Hadoop and Spark configuration for the AM. + // Distribute an archive with Hadoop and Spark configuration for the AM and executors. val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(), resType = LocalResourceType.ARCHIVE, - destName = Some(LOCALIZED_CONF_DIR), - appMasterOnly = true) + destName = Some(LOCALIZED_CONF_DIR)) require(confLocalizedPath != null) localResources @@ -554,10 +536,10 @@ private[spark] class Client( /** * Create an archive with the config files for distribution. * - * These are only used by the AM, since executors will use the configuration object broadcast by - * the driver. The files are zipped and added to the job as an archive, so that YARN will explode - * it when distributing to the AM. This directory is then added to the classpath of the AM - * process, just to make sure that everybody is using the same default config. + * These will be used by AM and executors. The files are zipped and added to the job as an + * archive, so that YARN will explode it when distributing to AM and executors. This directory + * is then added to the classpath of AM and executor process, just to make sure that everybody + * is using the same default config. * * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR * shows up in the classpath before YARN_CONF_DIR. @@ -576,11 +558,14 @@ private[spark] class Client( // required when user changes log4j.properties directly to set the log configurations. If // configuration file is provided through --files then executors will be taking configurations // from --files instead of $SPARK_CONF_DIR/log4j.properties. - val log4jFileName = "log4j.properties" - Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { url => - if (url.getProtocol == "file") { - hadoopConfFiles(log4jFileName) = new File(url.getPath) - } + + // Also uploading metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + hadoopConfFiles(prop) = new File(url.getPath) } Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => @@ -659,7 +644,7 @@ private[spark] class Client( pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() - populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH)) + populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -1236,18 +1221,16 @@ object Client extends Logging { conf: Configuration, sparkConf: SparkConf, env: HashMap[String, String], - isAM: Boolean, extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) } + addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) - if (isAM) { - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_CONF_DIR, env) - } + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_CONF_DIR, env) if (sparkConf.get(USER_CLASS_PATH_FIRST)) { // in order to properly add the app jar when user classpath is first diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index f956a4d1d5..7b55d781f8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable( private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(null, yarnConf, sparkConf, env, false, - sparkConf.get(EXECUTOR_CLASS_PATH)) + Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) sparkConf.getExecutorEnv.foreach { case (key, value) => // This assumes each executor environment variable set here is a path diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index e3613a93ed..64723c361c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - populateClasspath(args, conf, sparkConf, env, true) + populateClasspath(args, conf, sparkConf, env) val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll "/remotePath/1:/remotePath/2") val env = new MutableHashMap[String, String]() - populateClasspath(null, conf, sparkConf, env, false, - extraClassPath = Some("/localPath/my1.jar")) + populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar")) val cp = classpath(env) cp should contain ("/remotePath/spark.jar") cp should contain ("/remotePath/my1.jar") @@ -356,7 +355,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private def classpath(client: Client): Array[String] = { val env = new MutableHashMap[String, String]() - populateClasspath(null, client.hadoopConf, client.sparkConf, env, false) + populateClasspath(null, client.hadoopConf, client.sparkConf, env) classpath(env) } -- cgit v1.2.3