aboutsummaryrefslogtreecommitdiff
path: root/core/src/hadoop2-yarn/scala
diff options
context:
space:
mode:
authorThomas Graves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-06-19 11:18:42 -0500
committerThomas Graves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-06-19 11:18:42 -0500
commit75d78c7ac90ce717bf1009ec4d335fb4a6cfde24 (patch)
tree56c92d7a2a667c5e77eec864efe4373a8dcea728 /core/src/hadoop2-yarn/scala
parent84530ba6d9fa47ee2863bb50c23742ecfa2a6a64 (diff)
downloadspark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.tar.gz
spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.tar.bz2
spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.zip
Add support for Spark on Yarn on a secure Hadoop cluster
Diffstat (limited to 'core/src/hadoop2-yarn/scala')
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala13
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala50
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala33
4 files changed, 73 insertions, 40 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
index ab1ab9d8a7..b96c047e10 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -27,18 +27,7 @@ object SparkHadoopUtil {
}
def runAsUser(func: (Product) => Unit, args: Product, user: String) {
-
- // println("running as user " + jobUserName)
-
- UserGroupInformation.setConfiguration(yarnConf)
- val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(user)
- appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
- def run: AnyRef = {
- func(args)
- // no return value ...
- null
- }
- })
+ func(args)
}
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index aa72c1e5fe..f19648ec68 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -30,23 +30,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
def run() {
- // Initialization
- val jobUserName = Utils.getUserNameFromEnvironment()
- logInfo("running as user " + jobUserName)
-
- // run as user ...
- UserGroupInformation.setConfiguration(yarnConf)
- val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(jobUserName)
- appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
- def run: AnyRef = {
- runImpl()
- return null
- }
- })
- }
-
- private def runImpl() {
-
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index 7a881e26df..f20cc31c7c 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -1,9 +1,13 @@
package spark.deploy.yarn
import java.net.{InetSocketAddress, URI}
+import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -23,6 +27,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials();
def run() {
init(yarnConf)
@@ -41,7 +46,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
appContext.setUser(args.amUser)
-
+
submitApp(appContext)
monitorApplication(appId)
@@ -62,14 +67,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
- logInfo("Max mem capabililty of resources in this cluster " + maxMem)
+ logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
- // If the cluster does not have enough memory resources, exit.
- val requestedMem = (args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + args.numWorkers * args.workerMemory
- if (requestedMem > maxMem) {
- logError("Cluster cannot satisfy memory resource request of " + requestedMem)
+ // if we have requested more then the clusters max for a single resource then exit.
+ if (args.workerMemory > maxMem) {
+ logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ System.exit(1)
+ }
+ val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ if (amMem > maxMem) {
+ logError("AM size is to large to run on this cluster " + amMem)
System.exit(1)
}
+
+ // We could add checks to make sure the entire cluster has enough resources but that involves getting
+ // all the node reports and computing ourselves
}
def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
@@ -86,6 +98,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Upload Spark and the application JAR to the remote file system
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
+
+ val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ logError("Can't get Master Kerberos principal for use as renewer")
+ System.exit(1)
+ }
+ }
+
Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -97,6 +118,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
fs.copyFromLocalFile(false, true, src, dst)
val destStatus = fs.getFileStatus(dst)
+ // get tokens for anything we upload to hdfs
+ if (UserGroupInformation.isSecurityEnabled()) {
+ fs.addDelegationTokens(delegTokenRenewer, credentials);
+ }
+
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(LocalResourceType.FILE)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
@@ -106,6 +132,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
locaResources(destName) = amJarRsrc
}
}
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
return locaResources
}
@@ -115,6 +142,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
Apps.addToEnvironment(env, Environment.USER.name, args.amUser)
+ // set this so that UGI set to correct user in unsecure mode
+ Apps.addToEnvironment(env, "HADOOP_USER_NAME", args.amUser)
// If log4j present, ensure ours overrides all others
if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
@@ -195,7 +224,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- val commands = List[String]("java " +
+ val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " +
" -server " +
JAVA_OPTS +
" spark.deploy.yarn.ApplicationMaster" +
@@ -214,7 +243,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Memory for the ApplicationMaster
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
amContainer.setResource(capability)
-
+
+ // Setup security tokens
+ val dob = new DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+ amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
return amContainer
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
index a2bf0af762..e22d256a84 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -1,9 +1,12 @@
package spark.deploy.yarn
import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
@@ -11,7 +14,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import scala.collection.JavaConversions._
@@ -76,7 +79,13 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
*/
ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
- val commands = List[String]("java " +
+
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+ val dob = new DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+ ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+ val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
@@ -145,6 +154,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
val env = new HashMap[String, String]()
// should we add this ?
Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment())
+ // set this so that UGI set to correct user in unsecure mode
+ Apps.addToEnvironment(env, "HADOOP_USER_NAME", Utils.getUserNameFromEnvironment())
// If log4j present, ensure ours overrides all others
if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
@@ -165,7 +176,23 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
logInfo("Connecting to ContainerManager at " + cmHostPortStr)
- return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
+
+ // use doAs and remoteUser here so we can add the container token and not
+ // pollute the current users credentials with all of the individual container tokens
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString());
+ val containerToken = container.getContainerToken();
+ if (containerToken != null) {
+ user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+ }
+
+ val proxy = user
+ .doAs(new PrivilegedExceptionAction[ContainerManager] {
+ def run: ContainerManager = {
+ return rpc.getProxy(classOf[ContainerManager],
+ cmAddress, conf).asInstanceOf[ContainerManager]
+ }
+ });
+ return proxy;
}
}