diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-07-16 17:30:15 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-07-16 17:30:15 -0700 |
commit | 87d586e4da63e6e1875d9cac194c6f11e1cdc653 (patch) | |
tree | 3b2ed3203b6abb94a3a853a7b95dfebaa67665a1 /core/src/hadoop2-yarn/scala | |
parent | d733527bb4dad14b276b4f56b1ff5c3ee1cb7f75 (diff) | |
parent | 4ff494de20c36151dc29a60825d67e094d14acd4 (diff) | |
download | spark-87d586e4da63e6e1875d9cac194c6f11e1cdc653.tar.gz spark-87d586e4da63e6e1875d9cac194c6f11e1cdc653.tar.bz2 spark-87d586e4da63e6e1875d9cac194c6f11e1cdc653.zip |
Merge remote-tracking branch 'old/master'
Diffstat (limited to 'core/src/hadoop2-yarn/scala')
10 files changed, 301 insertions, 69 deletions
diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index 875c0a220b..0f972b7a0b 100644 --- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -1,4 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.mapred import org.apache.hadoop.mapreduce.TaskType diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index 8bc6fb6dea..1a7cdf4788 100644 --- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.mapreduce import org.apache.hadoop.conf.Configuration diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index ab1ab9d8a7..6122fdced0 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,6 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy import collection.mutable.HashMap +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration @@ -27,18 +45,7 @@ object SparkHadoopUtil { } def runAsUser(func: (Product) => Unit, args: Product, user: String) { - - // println("running as user " + jobUserName) - - UserGroupInformation.setConfiguration(yarnConf) - val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(user) - appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] { - def run: AnyRef = { - func(args) - // no return value ... - null - } - }) + func(args) } // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. @@ -60,4 +67,10 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) { + val jobCreds = conf.getCredentials(); + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index aa72c1e5fe..1b06169739 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy.yarn import java.net.Socket @@ -27,26 +44,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = null + private var isFinished:Boolean = false def run() { - // Initialization - val jobUserName = Utils.getUserNameFromEnvironment() - logInfo("running as user " + jobUserName) - - // run as user ... - UserGroupInformation.setConfiguration(yarnConf) - val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(jobUserName) - appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] { - def run: AnyRef = { - runImpl() - return null - } - }) - } - - private def runImpl() { - appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() @@ -85,10 +86,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Wait for the user class to Finish userThread.join() - - // Finish the ApplicationMaster - finishApplicationMaster() - // TODO: Exit based on success/failure + System.exit(0) } @@ -141,17 +139,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } } - + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader) .getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { - // Copy - var mainArgs: Array[String] = new Array[String](args.userArgs.size()) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) - mainMethod.invoke(null, mainArgs) + var successed = false + try { + // Copy + var mainArgs: Array[String] = new Array[String](args.userArgs.size()) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) + mainMethod.invoke(null, mainArgs) + // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR + // userThread will stop here unless it has uncaught exception thrown out + // It need shutdown hook to set SUCCEEDED + successed = true + } finally { + if (successed) { + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + } else { + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED) + } + } } } t.start() @@ -196,7 +207,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("All workers have launched.") // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - if (userThread.isAlive){ + if (userThread.isAlive) { // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) @@ -214,7 +225,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e val t = new Thread { override def run() { - while (userThread.isAlive){ + while (userThread.isAlive) { val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") @@ -252,14 +263,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } */ - - def finishApplicationMaster() { + + def finishApplicationMaster(status: FinalApplicationStatus) { + + synchronized { + if (isFinished) { + return + } + isFinished = true + } + + logInfo("finishApplicationMaster with " + status) val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) - // TODO: Check if the application has failed or succeeded - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED) + finishReq.setFinishApplicationStatus(status) resourceManager.finishApplicationMaster(finishReq) + } } @@ -273,7 +293,7 @@ object ApplicationMaster { private val ALLOCATOR_LOOP_WAIT_COUNT = 30 def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) - if (count >= ALLOCATOR_LOOP_WAIT_COUNT){ + if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.synchronized { // to wake threads off wait ... yarnAllocatorLoop.notifyAll() @@ -308,14 +328,16 @@ object ApplicationMaster { logInfo("Invoking sc stop from shutdown hook") sc.stop() // best case ... - for (master <- applicationMasters) master.finishApplicationMaster + for (master <- applicationMasters) { + master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + } } } ) } // Wait for initialization to complete and atleast 'some' nodes can get allocated yarnAllocatorLoop.synchronized { - while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){ + while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.wait(1000L) } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala index 1b00208511..8de44b1f66 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy.yarn import spark.util.IntParam diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala index 7a881e26df..8bcbfc2735 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala @@ -1,9 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy.yarn import java.net.{InetSocketAddress, URI} +import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.mapred.Master import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -23,6 +44,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + val credentials = UserGroupInformation.getCurrentUser().getCredentials(); def run() { init(yarnConf) @@ -40,8 +62,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) - appContext.setUser(args.amUser) - + appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) + submitApp(appContext) monitorApplication(appId) @@ -62,14 +84,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() - logInfo("Max mem capabililty of resources in this cluster " + maxMem) + logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) - // If the cluster does not have enough memory resources, exit. - val requestedMem = (args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + args.numWorkers * args.workerMemory - if (requestedMem > maxMem) { - logError("Cluster cannot satisfy memory resource request of " + requestedMem) + // if we have requested more then the clusters max for a single resource then exit. + if (args.workerMemory > maxMem) { + logError("the worker size is to large to run on this cluster " + args.workerMemory); + System.exit(1) + } + val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD + if (amMem > maxMem) { + logError("AM size is to large to run on this cluster " + amMem) System.exit(1) } + + // We could add checks to make sure the entire cluster has enough resources but that involves getting + // all the node reports and computing ourselves } def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { @@ -86,6 +115,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Upload Spark and the application JAR to the remote file system // Add them as local resources to the AM val fs = FileSystem.get(conf) + + val delegTokenRenewer = Master.getMasterPrincipal(conf); + if (UserGroupInformation.isSecurityEnabled()) { + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + logError("Can't get Master Kerberos principal for use as renewer") + System.exit(1) + } + } + Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" @@ -97,6 +135,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl fs.copyFromLocalFile(false, true, src, dst) val destStatus = fs.getFileStatus(dst) + // get tokens for anything we upload to hdfs + if (UserGroupInformation.isSecurityEnabled()) { + fs.addDelegationTokens(delegTokenRenewer, credentials); + } + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] amJarRsrc.setType(LocalResourceType.FILE) amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) @@ -106,6 +149,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl locaResources(destName) = amJarRsrc } } + UserGroupInformation.getCurrentUser().addCredentials(credentials); return locaResources } @@ -114,7 +158,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null) val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.USER.name, args.amUser) // If log4j present, ensure ours overrides all others if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") @@ -142,6 +185,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // Add each SPARK-* key to the environment System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } return env @@ -195,7 +239,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - val commands = List[String]("java " + + var javaCommand = "java"; + val javaHome = System.getenv("JAVA_HOME") + if (javaHome != null && !javaHome.isEmpty()) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + " -server " + JAVA_OPTS + " spark.deploy.yarn.ApplicationMaster" + @@ -214,7 +264,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Memory for the ApplicationMaster capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) amContainer.setResource(capability) - + + // Setup security tokens + val dob = new DataOutputBuffer() + credentials.writeTokenStorageToStream(dob) + amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData())) + return amContainer } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala index 24110558e7..67aff03781 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy.yarn import spark.util.MemoryParam @@ -13,7 +30,6 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amUser = System.getProperty("user.name") var amQueue = System.getProperty("QUEUE", "default") var amMemory: Int = 512 // TODO @@ -58,10 +74,6 @@ class ClientArguments(val args: Array[String]) { workerCores = value args = tail - case ("--user") :: value :: tail => - amUser = value - args = tail - case ("--queue") :: value :: tail => amQueue = value args = tail @@ -96,8 +108,7 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" ) System.exit(exitCode) } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala index a2bf0af762..f458f2f6a1 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala @@ -1,9 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy.yarn import java.net.URI +import java.nio.ByteBuffer +import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ @@ -11,7 +31,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} +import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import scala.collection.JavaConversions._ @@ -76,7 +96,19 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S */ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) - val commands = List[String]("java " + + + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + val dob = new DataOutputBuffer() + credentials.writeTokenStorageToStream(dob) + ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) + + var javaCommand = "java"; + val javaHome = System.getenv("JAVA_HOME") + if (javaHome != null && !javaHome.isEmpty()) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state. @@ -143,8 +175,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // should we add this ? - Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment()) // If log4j present, ensure ours overrides all others if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { @@ -165,7 +195,23 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort() val cmAddress = NetUtils.createSocketAddr(cmHostPortStr) logInfo("Connecting to ContainerManager at " + cmHostPortStr) - return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] + + // use doAs and remoteUser here so we can add the container token and not + // pollute the current users credentials with all of the individual container tokens + val user = UserGroupInformation.createRemoteUser(container.getId().toString()); + val containerToken = container.getContainerToken(); + if (containerToken != null) { + user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress)) + } + + val proxy = user + .doAs(new PrivilegedExceptionAction[ContainerManager] { + def run: ContainerManager = { + return rpc.getProxy(classOf[ContainerManager], + cmAddress, conf).asInstanceOf[ContainerManager] + } + }); + return proxy; } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala index 61dd72a651..b0af8baf08 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.deploy.yarn import spark.{Logging, Utils} diff --git a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala index ed732d36bf..307d96111c 100644 --- a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.scheduler.cluster import spark._ |