From 0fff4ee8523ff4137eedfc314b51135427137c63 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Thu, 3 Oct 2013 11:52:16 -0500 Subject: Adding in the --addJars option to make SparkContext.addJar work on yarn and cleanup the classpaths --- .../org/apache/spark/deploy/yarn/Client.scala | 27 ++++++++++++++++------ .../apache/spark/deploy/yarn/ClientArguments.scala | 10 ++++++-- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 17 +++++--------- 3 files changed, 34 insertions(+), 20 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b4d243ed7a..fb1b339f27 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,7 +127,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl originalPath: Path, replication: Short, localResources: HashMap[String,LocalResource], - fragment: String) = { + fragment: String, + appMasterOnly: Boolean = false): Unit = { val fs = FileSystem.get(conf) val newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) @@ -149,6 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl pathURI = new URI(newPath.toString() + "#" + fragment); } val distPath = pathURI.toString() + if (appMasterOnly == true) return if (resourceType == LocalResourceType.FILE) { distFiles match { case Some(path) => @@ -223,6 +225,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } + // handle any add jars + if ((args.addJars != null) && (!args.addJars.isEmpty())){ + args.addJars.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment(), true) + } + } + // handle any distributed cache files if ((args.files != null) && (!args.files.isEmpty())){ args.files.split(',').foreach { case file: String => @@ -253,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = @@ -279,6 +290,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // set the environment variables to be passed on to the Workers if (distFiles != None) { env("SPARK_YARN_CACHE_FILES") = distFiles.get env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get @@ -328,8 +340,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. @@ -345,6 +357,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " } + if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 30d9b6e60f..0833153541 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,7 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var addJars: String = null var files: String = null var archives: String = null var userJar: String = null @@ -80,6 +81,10 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--addJars") :: value :: tail => + addJars = value + args = tail + case ("--files") :: value :: tail => files = value args = tail @@ -119,8 +124,9 @@ class ClientArguments(val args: Array[String]) { " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --files file Comma separated list of files to be distributed with the job.\n" + - " --archives archive Comma separated list of archives to be distributed with the job." + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + + " --files files Comma separated list of files to be distributed with the job.\n" + + " --archives archives Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index d340b114df..8dac9e02ac 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same @@ -215,15 +216,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { - // Which is correct ? - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") - } - - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") Client.populateHadoopClasspath(yarnConf, env) // allow users to specify some environment variables -- cgit v1.2.3