aboutsummaryrefslogtreecommitdiff
path: root/core/src/hadoop2-yarn/scala
diff options
context:
space:
mode:
authorY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.(none)>2013-07-02 21:18:59 -0500
committerY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.(none)>2013-07-02 21:18:59 -0500
commit923cf929003c67963e273fcdcd5b01baf68df8b5 (patch)
tree7049909ae9f19e9a22382e584c43dd8e3c4f2a8b /core/src/hadoop2-yarn/scala
parentbad51c7cb4e15b68ce49ac4886e4631b9fb7e308 (diff)
downloadspark-923cf929003c67963e273fcdcd5b01baf68df8b5.tar.gz
spark-923cf929003c67963e273fcdcd5b01baf68df8b5.tar.bz2
spark-923cf929003c67963e273fcdcd5b01baf68df8b5.zip
Rework from pull request. Removed --user option from Spark on Yarn Client, made the user of JAVA_HOME environment
variable conditional on if its set, and created addCredentials in each of the SparkHadoopUtil classes to only add the credentials when the profile is hadoop2-yarn.
Diffstat (limited to 'core/src/hadoop2-yarn/scala')
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala7
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala14
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala8
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala12
4 files changed, 24 insertions, 17 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
index b96c047e10..301a57fffa 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,6 +1,7 @@
package spark.deploy
import collection.mutable.HashMap
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
@@ -49,4 +50,10 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {
+ val jobCreds = conf.getCredentials();
+ jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+ }
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index f20cc31c7c..514c17f241 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -45,7 +45,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
- appContext.setUser(args.amUser)
+ appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
submitApp(appContext)
@@ -141,9 +141,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
val env = new HashMap[String, String]()
- Apps.addToEnvironment(env, Environment.USER.name, args.amUser)
- // set this so that UGI set to correct user in unsecure mode
- Apps.addToEnvironment(env, "HADOOP_USER_NAME", args.amUser)
// If log4j present, ensure ours overrides all others
if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
@@ -171,6 +168,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
+
// Add each SPARK-* key to the environment
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
@@ -224,7 +222,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " +
+ var javaCommand = "java";
+ val javaHome = System.getenv("JAVA_HOME")
+ if (javaHome != null && !javaHome.isEmpty()) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
" spark.deploy.yarn.ApplicationMaster" +
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
index 24110558e7..07e7edea36 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
@@ -13,7 +13,6 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amUser = System.getProperty("user.name")
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
// TODO
@@ -58,10 +57,6 @@ class ClientArguments(val args: Array[String]) {
workerCores = value
args = tail
- case ("--user") :: value :: tail =>
- amUser = value
- args = tail
-
case ("--queue") :: value :: tail =>
amQueue = value
args = tail
@@ -96,8 +91,7 @@ class ClientArguments(val args: Array[String]) {
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n"
+ " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
)
System.exit(exitCode)
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
index e22d256a84..cc6f3344a1 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -85,7 +85,13 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " +
+ var javaCommand = "java";
+ val javaHome = System.getenv("JAVA_HOME")
+ if (javaHome != null && !javaHome.isEmpty()) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
@@ -152,10 +158,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- // should we add this ?
- Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment())
- // set this so that UGI set to correct user in unsecure mode
- Apps.addToEnvironment(env, "HADOOP_USER_NAME", Utils.getUserNameFromEnvironment())
// If log4j present, ensure ours overrides all others
if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {