From b6571541a6043ed12cb8af51e198e207968394a7 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Mon, 21 Oct 2013 14:05:15 -0500 Subject: Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath to be explicit about inclusion of spark.jar and app.jar --- .../org/apache/spark/deploy/yarn/Client.scala | 32 ++++++++++++++++++---- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 7 ++--- 2 files changed, 29 insertions(+), 10 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8afb3e39cb..1a380ae714 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) - Apps.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + "*") - - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + @@ -451,4 +447,30 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } + + def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + // If log4j present, ensure ours overrides all others + if (addLog4j) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "log4j.properties") + } + // normally the users app.jar is last in case conflicts with spark jars + val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false") + .toBoolean + if (userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "app.jar") + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "spark.jar") + Client.populateHadoopClasspath(conf, env) + + if (!userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "app.jar") + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "*") + } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 8dac9e02ac..ba352daac4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ? " -XX:OnOutOfMemoryError='kill %p' " + JAVA_OPTS + - " org.apache.spark.executor.StandaloneExecutorBackend " + + " org.apache.spark.executor.CoarseGrainedExecutorBackend " + masterAddress + " " + slaveId + " " + hostname + " " + @@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) - Apps.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + "*") - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) -- cgit v1.2.3