diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-21 23:35:13 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-21 23:35:13 -0700 |
commit | b84193c5b86d60f36624ae29169cf62e489005af (patch) | |
tree | f6373f1ff3c25bc7bb1c3be20464c08ccf6c29ad | |
parent | 731c94e91d310eadb58ec5ff3338b430f10fdefb (diff) | |
parent | b6571541a6043ed12cb8af51e198e207968394a7 (diff) | |
download | spark-b84193c5b86d60f36624ae29169cf62e489005af.tar.gz spark-b84193c5b86d60f36624ae29169cf62e489005af.tar.bz2 spark-b84193c5b86d60f36624ae29169cf62e489005af.zip |
Merge pull request #92 from tgravescs/sparkYarnFixClasspath
Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath ...
...to be explicit about inclusion of spark.jar and app.jar. Be explicit so if there are any conflicts in packaging between spark.jar and app.jar we don't get random results due to the classpath having /*, which can including things in different order.
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 32 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala | 7 |
2 files changed, 29 insertions, 10 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8afb3e39cb..1a380ae714 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) - Apps.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + "*") - - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + @@ -451,4 +447,30 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } + + def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + // If log4j present, ensure ours overrides all others + if (addLog4j) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "log4j.properties") + } + // normally the users app.jar is last in case conflicts with spark jars + val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false") + .toBoolean + if (userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "app.jar") + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "spark.jar") + Client.populateHadoopClasspath(conf, env) + + if (!userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "app.jar") + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "*") + } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 8dac9e02ac..ba352daac4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ? " -XX:OnOutOfMemoryError='kill %p' " + JAVA_OPTS + - " org.apache.spark.executor.StandaloneExecutorBackend " + + " org.apache.spark.executor.CoarseGrainedExecutorBackend " + masterAddress + " " + slaveId + " " + hostname + " " + @@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) - Apps.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + "*") - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) |