From 69047506bf97e6e37e4079c87cb0327d3760ac41 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 17 Apr 2014 10:29:38 -0500 Subject: [SPARK-1395] Allow "local:" URIs to work on Yarn. This only works for the three paths defined in the environment (SPARK_JAR, SPARK_YARN_APP_JAR and SPARK_LOG4J_CONF). Tested by running SparkPi with local: and file: URIs against Yarn cluster (no "upload" shows up in logs in the local case). Author: Marcelo Vanzin Closes #303 from vanzin/yarn-local and squashes the following commits: 82219c1 [Marcelo Vanzin] [SPARK-1395] Allow "local:" URIs to work on Yarn. --- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- .../org/apache/spark/deploy/yarn/ClientBase.scala | 190 ++++++++++++++------- .../spark/deploy/yarn/ExecutorRunnableUtil.scala | 17 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 6 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- 5 files changed, 140 insertions(+), 77 deletions(-) (limited to 'yarn') diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3469b7dece..7dae248e3e 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -82,7 +82,7 @@ class ExecutorRunnable( ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources.contains(ClientBase.LOG4J_PROP)) + localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 628dd98860..566de712fc 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import scala.collection.JavaConversions._ @@ -209,53 +209,35 @@ trait ClientBase extends Logging { Map( ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar, - ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF") + ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) ).foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val localURI = new URI(localPath) - val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false - val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - destName, statCache) + if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false + val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + destName, statCache) + } } } - // Handle jars local to the ApplicationMaster. - if ((args.addJars != null) && (!args.addJars.isEmpty())){ - args.addJars.split(',').foreach { case file: String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - // Only add the resource to the Spark ApplicationMaster. - val appMasterOnly = true - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache, appMasterOnly) - } - } - - // Handle any distributed cache files - if ((args.files != null) && (!args.files.isEmpty())){ - args.files.split(',').foreach { case file: String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache) - } - } - - // Handle any distributed cache archives - if ((args.archives != null) && (!args.archives.isEmpty())) { - args.archives.split(',').foreach { case file:String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, - linkname, statCache) + val fileLists = List( (args.addJars, LocalResourceType.FILE, true), + (args.files, LocalResourceType.FILE, false), + (args.archives, LocalResourceType.ARCHIVE, false) ) + fileLists.foreach { case (flist, resType, appMasterOnly) => + if (flist != null && !flist.isEmpty()) { + flist.split(',').foreach { case file: String => + val localURI = new URI(file.trim()) + if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyRemoteFile(dst, localPath, replication) + distCacheMgr.addResource(fs, conf, destPath, localResources, resType, + linkname, statCache, appMasterOnly) + } + } } } @@ -269,12 +251,14 @@ trait ClientBase extends Logging { logInfo("Setting up the launch environment") val env = new HashMap[String, String]() - - ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP), - env) + val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) + ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() + if (log4jConf != null) { + env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf + } // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) @@ -345,10 +329,7 @@ trait ClientBase extends Logging { if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += " " + env("SPARK_JAVA_OPTS") } - - if (!localResources.contains(ClientBase.LOG4J_PROP)) { - JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine() - } + JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) // Command for the ApplicationMaster val commands = List[String]( @@ -377,6 +358,8 @@ object ClientBase { val SPARK_JAR: String = "spark.jar" val APP_JAR: String = "app.jar" val LOG4J_PROP: String = "log4j.properties" + val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" + val LOCAL_SCHEME = "local" // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) { @@ -428,30 +411,113 @@ object ClientBase { } } - def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { + /** + * Returns the java command line argument for setting up log4j. If there is a log4j.properties + * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable + * is checked. + */ + def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = { + var log4jConf = LOG4J_PROP + if (!localResources.contains(log4jConf)) { + log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match { + case conf: String => + val confUri = new URI(conf) + if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) { + "file://" + confUri.getPath() + } else { + ClientBase.LOG4J_PROP + } + case null => "log4j-spark-container.properties" + } + } + " -Dlog4j.configuration=" + log4jConf + } + + def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf, + log4jConf: String, env: HashMap[String, String]) { YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(), File.pathSeparator) - // If log4j present, ensure ours overrides all others - if (addLog4j) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + LOG4J_PROP, File.pathSeparator) + if (log4jConf != null) { + // If a custom log4j config file is provided as a local: URI, add its parent directory to the + // classpath. Note that this only works if the custom config's file name is + // "log4j.properties". + val localPath = getLocalPath(log4jConf) + if (localPath != null) { + val parentPath = new File(localPath).getParent() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath, + File.pathSeparator) + } } // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + APP_JAR, File.pathSeparator) + addUserClasspath(args, env) } - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + SPARK_JAR, File.pathSeparator) + addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env); ClientBase.populateHadoopClasspath(conf, env) - if (!userClasspathFirst) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + APP_JAR, File.pathSeparator) + addUserClasspath(args, env) + } + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator) + } + + /** + * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly + * to the classpath. + */ + private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = { + if (args != null) { + addClasspathEntry(args.userJar, APP_JAR, env) + } + + if (args != null && args.addJars != null) { + args.addJars.split(",").foreach { case file: String => + addClasspathEntry(file, null, env) + } + } + } + + /** + * Adds the given path to the classpath, handling "local:" URIs correctly. + * + * If an alternate name for the file is given, and it's not a "local:" file, the alternate + * name will be added to the classpath (relative to the job's work directory). + * + * If not a "local:" file and no alternate name, the environment is not modified. + * + * @param path Path to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. + */ + private def addClasspathEntry(path: String, fileName: String, + env: HashMap[String, String]) : Unit = { + if (path != null) { + scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { + val localPath = getLocalPath(path) + if (localPath != null) { + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath, + File.pathSeparator) + return + } + } + } + if (fileName != null) { + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator); + } + } + + /** + * Returns the local path if the URI is a "local:" URI, or null otherwise. + */ + private def getLocalPath(resource: String): String = { + val uri = new URI(resource) + if (LOCAL_SCHEME.equals(uri.getScheme())) { + return uri.getPath() } - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + "*", File.pathSeparator) + null } + } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 9159cc4ad5..40b38661f7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging { hostname: String, executorMemory: Int, executorCores: Int, - userSpecifiedLogFile: Boolean) = { + localResources: HashMap[String, LocalResource]) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory @@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging { JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - if (!userSpecifiedLogFile) { - JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine() - } + JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. @@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging { rtype: LocalResourceType, localResources: HashMap[String, LocalResource], timestamp: String, - size: String, + size: String, vis: String) = { val uri = new URI(file) val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] @@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging { val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, timeStamps(i), fileSizes(i), visibilities(i)) } } @@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging { def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) + ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env) + if (log4jConf != null) { + env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf + } // Allow users to specify some environment variables YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"), diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4ceed95a25..832d45b3ad 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } - override def getCurrentUserCredentials(): Credentials = { + override def getCurrentUserCredentials(): Credentials = { UserGroupInformation.getCurrentUser().getCredentials() } @@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } object YarnSparkHadoopUtil { - def getLoggingArgsForContainerCommandLine(): String = { - "-Dlog4j.configuration=log4j-spark-container.properties" - } - def addToEnvironment( env: HashMap[String, String], variable: String, diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 81d9d1b5c9..117b33f466 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -79,7 +79,7 @@ class ExecutorRunnable( ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources.contains(ClientBase.LOG4J_PROP)) + localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) -- cgit v1.2.3