aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.(none)>2013-07-02 21:18:59 -0500
committerY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.(none)>2013-07-02 21:18:59 -0500
commit923cf929003c67963e273fcdcd5b01baf68df8b5 (patch)
tree7049909ae9f19e9a22382e584c43dd8e3c4f2a8b /core/src
parentbad51c7cb4e15b68ce49ac4886e4631b9fb7e308 (diff)
downloadspark-923cf929003c67963e273fcdcd5b01baf68df8b5.tar.gz
spark-923cf929003c67963e273fcdcd5b01baf68df8b5.tar.bz2
spark-923cf929003c67963e273fcdcd5b01baf68df8b5.zip
Rework from pull request. Removed --user option from Spark on Yarn Client, made the user of JAVA_HOME environment
variable conditional on if its set, and created addCredentials in each of the SparkHadoopUtil classes to only add the credentials when the profile is hadoop2-yarn.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala7
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala14
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala8
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala12
-rw-r--r--core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala4
-rw-r--r--core/src/main/scala/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/InputFormatInfo.scala11
10 files changed, 37 insertions, 37 deletions
diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
index a0fb4fe25d..f1c86de4cc 100644
--- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,5 +1,6 @@
package spark.deploy
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
/**
@@ -20,4 +21,7 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {}
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
index b96c047e10..301a57fffa 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,6 +1,7 @@
package spark.deploy
import collection.mutable.HashMap
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
@@ -49,4 +50,10 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {
+ val jobCreds = conf.getCredentials();
+ jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+ }
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index f20cc31c7c..514c17f241 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -45,7 +45,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
- appContext.setUser(args.amUser)
+ appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
submitApp(appContext)
@@ -141,9 +141,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
val env = new HashMap[String, String]()
- Apps.addToEnvironment(env, Environment.USER.name, args.amUser)
- // set this so that UGI set to correct user in unsecure mode
- Apps.addToEnvironment(env, "HADOOP_USER_NAME", args.amUser)
// If log4j present, ensure ours overrides all others
if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
@@ -171,6 +168,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
+
// Add each SPARK-* key to the environment
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
@@ -224,7 +222,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " +
+ var javaCommand = "java";
+ val javaHome = System.getenv("JAVA_HOME")
+ if (javaHome != null && !javaHome.isEmpty()) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
" spark.deploy.yarn.ApplicationMaster" +
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
index 24110558e7..07e7edea36 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
@@ -13,7 +13,6 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amUser = System.getProperty("user.name")
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
// TODO
@@ -58,10 +57,6 @@ class ClientArguments(val args: Array[String]) {
workerCores = value
args = tail
- case ("--user") :: value :: tail =>
- amUser = value
- args = tail
-
case ("--queue") :: value :: tail =>
amQueue = value
args = tail
@@ -96,8 +91,7 @@ class ClientArguments(val args: Array[String]) {
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n"
+ " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
)
System.exit(exitCode)
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
index e22d256a84..cc6f3344a1 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -85,7 +85,13 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " +
+ var javaCommand = "java";
+ val javaHome = System.getenv("JAVA_HOME")
+ if (javaHome != null && !javaHome.isEmpty()) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
@@ -152,10 +158,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- // should we add this ?
- Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment())
- // set this so that UGI set to correct user in unsecure mode
- Apps.addToEnvironment(env, "HADOOP_USER_NAME", Utils.getUserNameFromEnvironment())
// If log4j present, ensure ours overrides all others
if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
index a0fb4fe25d..f1c86de4cc 100644
--- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,5 +1,6 @@
package spark.deploy
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
/**
@@ -20,4 +21,7 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {}
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index b397601184..c9d698fc09 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -614,10 +614,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) {
- // make sure to propogate any credentials from the current user to the jobConf
- // for Hadoop security
- val jobCreds = conf.getCredentials();
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
conf.setOutputKeyClass(keyClass)
conf.setOutputValueClass(valueClass)
// conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index c3a56938b5..6c37203707 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -295,10 +295,6 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
- // make sure to propogate any credentials from the current user to the jobConf
- // for Hadoop security
- val jobCreds = conf.getCredentials();
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
@@ -311,10 +307,6 @@ class SparkContext(
minSplits: Int = defaultMinSplits
) : RDD[(K, V)] = {
val conf = new JobConf(hadoopConfiguration)
- // make sure to propogate any credentials from the current user to the jobConf
- // for Hadoop security
- val jobCreds = conf.getCredentials();
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
FileInputFormat.setInputPaths(conf, path)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index cbf5512e24..07c103503c 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
+import spark.deploy.SparkHadoopUtil
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@@ -50,6 +51,7 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
+ SparkHadoopUtil.addCredentials(conf);
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 30a56d7135..17d0ea4f80 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -9,6 +9,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
+import spark.deploy.SparkHadoopUtil
/**
@@ -71,10 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val conf = new JobConf(configuration)
- // make sure to propogate any credentials from the current user to the jobConf
- // for Hadoop security
- val jobCreds = conf.getCredentials();
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+ SparkHadoopUtil.addCredentials(conf);
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -94,10 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val jobConf = new JobConf(configuration)
- // make sure to propogate any credentials from the current user to the jobConf
- // for Hadoop security
- val jobCreds = jobConf.getCredentials();
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+ SparkHadoopUtil.addCredentials(jobConf);
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =