aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-04-17 10:29:38 -0500
committerThomas Graves <tgraves@apache.org>2014-04-17 10:29:38 -0500
commit69047506bf97e6e37e4079c87cb0327d3760ac41 (patch)
treef1ffe2efe61f0eb5c379aab43f35c751164baf2b /yarn/common
parentbb76eae1b50e4bf18360220110f7d0a4bee672ec (diff)
downloadspark-69047506bf97e6e37e4079c87cb0327d3760ac41.tar.gz
spark-69047506bf97e6e37e4079c87cb0327d3760ac41.tar.bz2
spark-69047506bf97e6e37e4079c87cb0327d3760ac41.zip
[SPARK-1395] Allow "local:" URIs to work on Yarn.
This only works for the three paths defined in the environment (SPARK_JAR, SPARK_YARN_APP_JAR and SPARK_LOG4J_CONF). Tested by running SparkPi with local: and file: URIs against Yarn cluster (no "upload" shows up in logs in the local case). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #303 from vanzin/yarn-local and squashes the following commits: 82219c1 [Marcelo Vanzin] [SPARK-1395] Allow "local:" URIs to work on Yarn.
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala190
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala17
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala6
3 files changed, 138 insertions, 75 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 628dd98860..566de712fc 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn
import java.io.File
-import java.net.{InetAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
@@ -209,53 +209,35 @@ trait ClientBase extends Logging {
Map(
ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
- ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
+ ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
).foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
val localURI = new URI(localPath)
- val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
- val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- destName, statCache)
+ if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+ val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
+ val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ destName, statCache)
+ }
}
}
- // Handle jars local to the ApplicationMaster.
- if ((args.addJars != null) && (!args.addJars.isEmpty())){
- args.addJars.split(',').foreach { case file: String =>
- val localURI = new URI(file.trim())
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- // Only add the resource to the Spark ApplicationMaster.
- val appMasterOnly = true
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- linkname, statCache, appMasterOnly)
- }
- }
-
- // Handle any distributed cache files
- if ((args.files != null) && (!args.files.isEmpty())){
- args.files.split(',').foreach { case file: String =>
- val localURI = new URI(file.trim())
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- linkname, statCache)
- }
- }
-
- // Handle any distributed cache archives
- if ((args.archives != null) && (!args.archives.isEmpty())) {
- args.archives.split(',').foreach { case file:String =>
- val localURI = new URI(file.trim())
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
- linkname, statCache)
+ val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
+ (args.files, LocalResourceType.FILE, false),
+ (args.archives, LocalResourceType.ARCHIVE, false) )
+ fileLists.foreach { case (flist, resType, appMasterOnly) =>
+ if (flist != null && !flist.isEmpty()) {
+ flist.split(',').foreach { case file: String =>
+ val localURI = new URI(file.trim())
+ if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
+ linkname, statCache, appMasterOnly)
+ }
+ }
}
}
@@ -269,12 +251,14 @@ trait ClientBase extends Logging {
logInfo("Setting up the launch environment")
val env = new HashMap[String, String]()
-
- ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
- env)
+ val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
+ ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
+ if (log4jConf != null) {
+ env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
+ }
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
@@ -345,10 +329,7 @@ trait ClientBase extends Logging {
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
}
-
- if (!localResources.contains(ClientBase.LOG4J_PROP)) {
- JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
- }
+ JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
// Command for the ApplicationMaster
val commands = List[String](
@@ -377,6 +358,8 @@ object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val LOG4J_PROP: String = "log4j.properties"
+ val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
+ val LOCAL_SCHEME = "local"
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
@@ -428,30 +411,113 @@ object ClientBase {
}
}
- def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
+ /**
+ * Returns the java command line argument for setting up log4j. If there is a log4j.properties
+ * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
+ * is checked.
+ */
+ def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
+ var log4jConf = LOG4J_PROP
+ if (!localResources.contains(log4jConf)) {
+ log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
+ case conf: String =>
+ val confUri = new URI(conf)
+ if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
+ "file://" + confUri.getPath()
+ } else {
+ ClientBase.LOG4J_PROP
+ }
+ case null => "log4j-spark-container.properties"
+ }
+ }
+ " -Dlog4j.configuration=" + log4jConf
+ }
+
+ def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
+ log4jConf: String, env: HashMap[String, String]) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
File.pathSeparator)
- // If log4j present, ensure ours overrides all others
- if (addLog4j) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
+ if (log4jConf != null) {
+ // If a custom log4j config file is provided as a local: URI, add its parent directory to the
+ // classpath. Note that this only works if the custom config's file name is
+ // "log4j.properties".
+ val localPath = getLocalPath(log4jConf)
+ if (localPath != null) {
+ val parentPath = new File(localPath).getParent()
+ YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
+ File.pathSeparator)
+ }
}
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + APP_JAR, File.pathSeparator)
+ addUserClasspath(args, env)
}
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
+ addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
ClientBase.populateHadoopClasspath(conf, env)
-
if (!userClasspathFirst) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + APP_JAR, File.pathSeparator)
+ addUserClasspath(args, env)
+ }
+ YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
+ Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
+ }
+
+ /**
+ * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
+ * to the classpath.
+ */
+ private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
+ if (args != null) {
+ addClasspathEntry(args.userJar, APP_JAR, env)
+ }
+
+ if (args != null && args.addJars != null) {
+ args.addJars.split(",").foreach { case file: String =>
+ addClasspathEntry(file, null, env)
+ }
+ }
+ }
+
+ /**
+ * Adds the given path to the classpath, handling "local:" URIs correctly.
+ *
+ * If an alternate name for the file is given, and it's not a "local:" file, the alternate
+ * name will be added to the classpath (relative to the job's work directory).
+ *
+ * If not a "local:" file and no alternate name, the environment is not modified.
+ *
+ * @param path Path to add to classpath (optional).
+ * @param fileName Alternate name for the file (optional).
+ * @param env Map holding the environment variables.
+ */
+ private def addClasspathEntry(path: String, fileName: String,
+ env: HashMap[String, String]) : Unit = {
+ if (path != null) {
+ scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
+ val localPath = getLocalPath(path)
+ if (localPath != null) {
+ YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
+ File.pathSeparator)
+ return
+ }
+ }
+ }
+ if (fileName != null) {
+ YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
+ Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
+ }
+ }
+
+ /**
+ * Returns the local path if the URI is a "local:" URI, or null otherwise.
+ */
+ private def getLocalPath(resource: String): String = {
+ val uri = new URI(resource)
+ if (LOCAL_SCHEME.equals(uri.getScheme())) {
+ return uri.getPath()
}
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
- Path.SEPARATOR + "*", File.pathSeparator)
+ null
}
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 9159cc4ad5..40b38661f7 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
- userSpecifiedLogFile: Boolean) = {
+ localResources: HashMap[String, LocalResource]) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
@@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging {
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
- if (!userSpecifiedLogFile) {
- JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
- }
+ JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
@@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging {
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
- size: String,
+ size: String,
vis: String) = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
@@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging {
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
for( i <- 0 to distArchives.length - 1) {
- setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
+ setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
timeStamps(i), fileSizes(i), visibilities(i))
}
}
@@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging {
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+ val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
+ ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
+ if (log4jConf != null) {
+ env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
+ }
// Allow users to specify some environment variables
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4ceed95a25..832d45b3ad 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
- override def getCurrentUserCredentials(): Credentials = {
+ override def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}
@@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
- def getLoggingArgsForContainerCommandLine(): String = {
- "-Dlog4j.configuration=log4j-spark-container.properties"
- }
-
def addToEnvironment(
env: HashMap[String, String],
variable: String,