aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authortgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-10-21 14:05:15 -0500
committertgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-10-21 14:05:15 -0500
commitb6571541a6043ed12cb8af51e198e207968394a7 (patch)
tree19690365532e94123334564d63aa046b62dc517f /yarn
parent35886f347466b25625d5391c97c2deb8293ebc66 (diff)
downloadspark-b6571541a6043ed12cb8af51e198e207968394a7.tar.gz
spark-b6571541a6043ed12cb8af51e198e207968394a7.tar.bz2
spark-b6571541a6043ed12cb8af51e198e207968394a7.zip
Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath to be explicit
about inclusion of spark.jar and app.jar
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala32
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala7
2 files changed, 29 insertions, 10 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afb3e39cb..1a380ae714 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
- Apps.addToEnvironment(env, Environment.CLASSPATH.name,
- Environment.PWD.$() + Path.SEPARATOR + "*")
-
- Client.populateHadoopClasspath(yarnConf, env)
+ Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
@@ -451,4 +447,30 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
}
}
+
+ def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ // If log4j present, ensure ours overrides all others
+ if (addLog4j) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "log4j.properties")
+ }
+ // normally the users app.jar is last in case conflicts with spark jars
+ val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+ .toBoolean
+ if (userClasspathFirst) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "app.jar")
+ }
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "spark.jar")
+ Client.populateHadoopClasspath(conf, env)
+
+ if (!userClasspathFirst) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "app.jar")
+ }
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "*")
+ }
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 8dac9e02ac..ba352daac4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
- " org.apache.spark.executor.StandaloneExecutorBackend " +
+ " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
masterAddress + " " +
slaveId + " " +
hostname + " " +
@@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
- Apps.addToEnvironment(env, Environment.CLASSPATH.name,
- Environment.PWD.$() + Path.SEPARATOR + "*")
- Client.populateHadoopClasspath(yarnConf, env)
+ Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))