diff options
Diffstat (limited to 'core/src/hadoop2-yarn/scala')
4 files changed, 73 insertions, 40 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index ab1ab9d8a7..b96c047e10 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -27,18 +27,7 @@ object SparkHadoopUtil { } def runAsUser(func: (Product) => Unit, args: Product, user: String) { - - // println("running as user " + jobUserName) - - UserGroupInformation.setConfiguration(yarnConf) - val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(user) - appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] { - def run: AnyRef = { - func(args) - // no return value ... - null - } - }) + func(args) } // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index aa72c1e5fe..f19648ec68 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -30,23 +30,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e def run() { - // Initialization - val jobUserName = Utils.getUserNameFromEnvironment() - logInfo("running as user " + jobUserName) - - // run as user ... - UserGroupInformation.setConfiguration(yarnConf) - val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(jobUserName) - appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] { - def run: AnyRef = { - runImpl() - return null - } - }) - } - - private def runImpl() { - appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala index 7a881e26df..f20cc31c7c 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala @@ -1,9 +1,13 @@ package spark.deploy.yarn import java.net.{InetSocketAddress, URI} +import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.mapred.Master import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -23,6 +27,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + val credentials = UserGroupInformation.getCurrentUser().getCredentials(); def run() { init(yarnConf) @@ -41,7 +46,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setUser(args.amUser) - + submitApp(appContext) monitorApplication(appId) @@ -62,14 +67,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() - logInfo("Max mem capabililty of resources in this cluster " + maxMem) + logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) - // If the cluster does not have enough memory resources, exit. - val requestedMem = (args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + args.numWorkers * args.workerMemory - if (requestedMem > maxMem) { - logError("Cluster cannot satisfy memory resource request of " + requestedMem) + // if we have requested more then the clusters max for a single resource then exit. + if (args.workerMemory > maxMem) { + logError("the worker size is to large to run on this cluster " + args.workerMemory); + System.exit(1) + } + val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD + if (amMem > maxMem) { + logError("AM size is to large to run on this cluster " + amMem) System.exit(1) } + + // We could add checks to make sure the entire cluster has enough resources but that involves getting + // all the node reports and computing ourselves } def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { @@ -86,6 +98,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Upload Spark and the application JAR to the remote file system // Add them as local resources to the AM val fs = FileSystem.get(conf) + + val delegTokenRenewer = Master.getMasterPrincipal(conf); + if (UserGroupInformation.isSecurityEnabled()) { + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + logError("Can't get Master Kerberos principal for use as renewer") + System.exit(1) + } + } + Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" @@ -97,6 +118,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl fs.copyFromLocalFile(false, true, src, dst) val destStatus = fs.getFileStatus(dst) + // get tokens for anything we upload to hdfs + if (UserGroupInformation.isSecurityEnabled()) { + fs.addDelegationTokens(delegTokenRenewer, credentials); + } + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] amJarRsrc.setType(LocalResourceType.FILE) amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) @@ -106,6 +132,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl locaResources(destName) = amJarRsrc } } + UserGroupInformation.getCurrentUser().addCredentials(credentials); return locaResources } @@ -115,6 +142,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() Apps.addToEnvironment(env, Environment.USER.name, args.amUser) + // set this so that UGI set to correct user in unsecure mode + Apps.addToEnvironment(env, "HADOOP_USER_NAME", args.amUser) // If log4j present, ensure ours overrides all others if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") @@ -195,7 +224,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - val commands = List[String]("java " + + val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " + " -server " + JAVA_OPTS + " spark.deploy.yarn.ApplicationMaster" + @@ -214,7 +243,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Memory for the ApplicationMaster capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) amContainer.setResource(capability) - + + // Setup security tokens + val dob = new DataOutputBuffer() + credentials.writeTokenStorageToStream(dob) + amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData())) + return amContainer } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala index a2bf0af762..e22d256a84 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala @@ -1,9 +1,12 @@ package spark.deploy.yarn import java.net.URI +import java.nio.ByteBuffer +import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ @@ -11,7 +14,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} +import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import scala.collection.JavaConversions._ @@ -76,7 +79,13 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S */ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) - val commands = List[String]("java " + + + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + val dob = new DataOutputBuffer() + credentials.writeTokenStorageToStream(dob) + ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) + + val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state. @@ -145,6 +154,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S val env = new HashMap[String, String]() // should we add this ? Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment()) + // set this so that UGI set to correct user in unsecure mode + Apps.addToEnvironment(env, "HADOOP_USER_NAME", Utils.getUserNameFromEnvironment()) // If log4j present, ensure ours overrides all others if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { @@ -165,7 +176,23 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort() val cmAddress = NetUtils.createSocketAddr(cmHostPortStr) logInfo("Connecting to ContainerManager at " + cmHostPortStr) - return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] + + // use doAs and remoteUser here so we can add the container token and not + // pollute the current users credentials with all of the individual container tokens + val user = UserGroupInformation.createRemoteUser(container.getId().toString()); + val containerToken = container.getContainerToken(); + if (containerToken != null) { + user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress)) + } + + val proxy = user + .doAs(new PrivilegedExceptionAction[ContainerManager] { + def run: ContainerManager = { + return rpc.getProxy(classOf[ContainerManager], + cmAddress, conf).asInstanceOf[ContainerManager] + } + }); + return proxy; } } |