aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authortgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-10-03 11:52:16 -0500
committertgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-10-03 11:52:16 -0500
commit0fff4ee8523ff4137eedfc314b51135427137c63 (patch)
tree9f96cb33c2d5c3dc54b34a4716052957a1841633 /yarn
parent9d4246863a25f7c91f324e004fe000b9848f6057 (diff)
downloadspark-0fff4ee8523ff4137eedfc314b51135427137c63.tar.gz
spark-0fff4ee8523ff4137eedfc314b51135427137c63.tar.bz2
spark-0fff4ee8523ff4137eedfc314b51135427137c63.zip
Adding in the --addJars option to make SparkContext.addJar work on yarn and cleanup
the classpaths
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala27
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala17
3 files changed, 34 insertions, 20 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b4d243ed7a..fb1b339f27 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,7 +127,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
originalPath: Path,
replication: Short,
localResources: HashMap[String,LocalResource],
- fragment: String) = {
+ fragment: String,
+ appMasterOnly: Boolean = false): Unit = {
val fs = FileSystem.get(conf)
val newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
@@ -149,6 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
pathURI = new URI(newPath.toString() + "#" + fragment);
}
val distPath = pathURI.toString()
+ if (appMasterOnly == true) return
if (resourceType == LocalResourceType.FILE) {
distFiles match {
case Some(path) =>
@@ -223,6 +225,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
+ // handle any add jars
+ if ((args.addJars != null) && (!args.addJars.isEmpty())){
+ args.addJars.split(',').foreach { case file: String =>
+ val tmpURI = new URI(file)
+ val tmp = new Path(tmpURI)
+ copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
+ tmpURI.getFragment(), true)
+ }
+ }
+
// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
@@ -253,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
- // If log4j present, ensure ours overrides all others
- if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name,
+ Environment.PWD.$() + Path.SEPARATOR + "*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
@@ -279,6 +290,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
+ // set the environment variables to be passed on to the Workers
if (distFiles != None) {
env("SPARK_YARN_CACHE_FILES") = distFiles.get
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
@@ -328,8 +340,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "
- JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
@@ -345,6 +357,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
}
+
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 30d9b6e60f..0833153541 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,6 +24,7 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
class ClientArguments(val args: Array[String]) {
+ var addJars: String = null
var files: String = null
var archives: String = null
var userJar: String = null
@@ -80,6 +81,10 @@ class ClientArguments(val args: Array[String]) {
amQueue = value
args = tail
+ case ("--addJars") :: value :: tail =>
+ addJars = value
+ args = tail
+
case ("--files") :: value :: tail =>
files = value
args = tail
@@ -119,8 +124,9 @@ class ClientArguments(val args: Array[String]) {
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --files file Comma separated list of files to be distributed with the job.\n" +
- " --archives archive Comma separated list of archives to be distributed with the job."
+ " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+ " --files files Comma separated list of files to be distributed with the job.\n" +
+ " --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index d340b114df..8dac9e02ac 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
- JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
@@ -215,15 +216,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- // If log4j present, ensure ours overrides all others
- if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
- // Which is correct ?
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
- }
-
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name,
+ Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)
// allow users to specify some environment variables