diff options
Diffstat (limited to 'yarn')
8 files changed, 106 insertions, 422 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala deleted file mode 100644 index 9ff02046de..0000000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.yarn - -import java.security.PrivilegedExceptionAction -import java.util.concurrent.{Executors, TimeUnit} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.deploy.SparkHadoopUtil - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.util.ThreadUtils - -/* - * The following methods are primarily meant to make sure long-running apps like Spark - * Streaming apps can run without interruption while writing to secure HDFS. The - * scheduleLoginFromKeytab method is called on the driver when the - * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC - * once 75% of the renewal interval of the original delegation tokens used for the container - * has elapsed. It then creates new delegation tokens and writes them to HDFS in a - * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes - * to a new file, with a monotonically increasing suffix). After this, the credentials are - * updated once 75% of the new tokens renewal interval has elapsed. - * - * On the executor side, the updateCredentialsIfRequired method is called once 80% of the - * validity of the original tokens has elapsed. At that time the executor finds the - * credentials file with the latest timestamp and checks if it has read those credentials - * before (by keeping track of the suffix of the last file it read). If a new file has - * appeared, it will read the credentials and update the currently running UGI with it. This - * process happens again once 80% of the validity of this has expired. - */ -private[yarn] class AMDelegationTokenRenewer( - sparkConf: SparkConf, - hadoopConf: Configuration) extends Logging { - - private var lastCredentialsFileSuffix = 0 - - private val delegationTokenRenewer = - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread")) - - private val hadoopUtil = YarnSparkHadoopUtil.get - - private val daysToKeepFiles = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5) - private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5) - - /** - * Schedule a login from the keytab and principal set using the --principal and --keytab - * arguments to spark-submit. This login happens only when the credentials of the current user - * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from - * SparkConf to do the login. This method is a no-op in non-YARN mode. - * - */ - private[spark] def scheduleLoginFromKeytab(): Unit = { - val principal = sparkConf.get("spark.yarn.principal") - val keytab = sparkConf.get("spark.yarn.keytab") - - /** - * Schedule re-login and creation of new tokens. If tokens have already expired, this method - * will synchronously create new ones. - */ - def scheduleRenewal(runnable: Runnable): Unit = { - val credentials = UserGroupInformation.getCurrentUser.getCredentials - val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials) - // Run now! - if (renewalInterval <= 0) { - logInfo("HDFS tokens have expired, creating new tokens now.") - runnable.run() - } else { - logInfo(s"Scheduling login from keytab in $renewalInterval millis.") - delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS) - } - } - - // This thread periodically runs on the driver to update the delegation tokens on HDFS. - val driverTokenRenewerRunnable = - new Runnable { - override def run(): Unit = { - try { - writeNewTokensToHDFS(principal, keytab) - cleanupOldFiles() - } catch { - case e: Exception => - // Log the error and try to write new tokens back in an hour - logWarning("Failed to write out new credentials to HDFS, will try again in an " + - "hour! If this happens too often tasks will fail.", e) - delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS) - return - } - scheduleRenewal(this) - } - } - // Schedule update of credentials. This handles the case of updating the tokens right now - // as well, since the renenwal interval will be 0, and the thread will get scheduled - // immediately. - scheduleRenewal(driverTokenRenewerRunnable) - } - - // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At - // least numFilesToKeep files are kept for safety - private def cleanupOldFiles(): Unit = { - import scala.concurrent.duration._ - try { - val remoteFs = FileSystem.get(hadoopConf) - val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file")) - val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis - hadoopUtil.listFilesSorted( - remoteFs, credentialsPath.getParent, - credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .dropRight(numFilesToKeep) - .takeWhile(_.getModificationTime < thresholdTime) - .foreach(x => remoteFs.delete(x.getPath, true)) - } catch { - // Such errors are not fatal, so don't throw. Make sure they are logged though - case e: Exception => - logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " + - "warnings there may be an issue with your HDFS cluster.") - } - } - - private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = { - // Keytab is copied by YARN to the working directory of the AM, so full path is - // not needed. - - // HACK: - // HDFS will not issue new delegation tokens, if the Credentials object - // passed in already has tokens for that FS even if the tokens are expired (it really only - // checks if there are tokens for the service, and not if they are valid). So the only real - // way to get new tokens is to make sure a different Credentials object is used each time to - // get new tokens and then the new tokens are copied over the the current user's Credentials. - // So: - // - we login as a different user and get the UGI - // - use that UGI to get the tokens (see doAs block below) - // - copy the tokens over to the current user's credentials (this will overwrite the tokens - // in the current user's Credentials object for this FS). - // The login to KDC happens each time new tokens are required, but this is rare enough to not - // have to worry about (like once every day or so). This makes this code clearer than having - // to login and then relogin every time (the HDFS API may not relogin since we don't use this - // UGI directly for HDFS communication. - logInfo(s"Attempting to login to KDC using principal: $principal") - val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - logInfo("Successfully logged into KDC.") - val tempCreds = keytabLoggedInUGI.getCredentials - val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file")) - val dst = credentialsPath.getParent - keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] { - // Get a copy of the credentials - override def run(): Void = { - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst - hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds) - null - } - }) - // Add the temp credentials back to the original ones. - UserGroupInformation.getCurrentUser.addCredentials(tempCreds) - val remoteFs = FileSystem.get(hadoopConf) - // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM - // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file - // and update the lastCredentialsFileSuffix. - if (lastCredentialsFileSuffix == 0) { - hadoopUtil.listFilesSorted( - remoteFs, credentialsPath.getParent, - credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .lastOption.foreach { status => - lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath) - } - } - val nextSuffix = lastCredentialsFileSuffix + 1 - val tokenPathStr = - sparkConf.get("spark.yarn.credentials.file") + - SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix - val tokenPath = new Path(tokenPathStr) - val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - logInfo("Writing out delegation tokens to " + tempTokenPath.toString) - val credentials = UserGroupInformation.getCurrentUser.getCredentials - credentials.writeTokenStorageFile(tempTokenPath, hadoopConf) - logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr") - remoteFs.rename(tempTokenPath, tokenPath) - logInfo("Delegation token file rename complete.") - lastCredentialsFileSuffix = nextSuffix - } - - def stop(): Unit = { - delegationTokenRenewer.shutdown() - } -} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 82f2b7e1dd..70cb57ffd8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -75,8 +75,6 @@ private[spark] class ApplicationMaster( // Fields used in cluster mode. private val sparkContextRef = new AtomicReference[SparkContext](null) - private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None - final def run(): Int = { try { val appAttemptId = client.getAttemptId() @@ -127,15 +125,6 @@ private[spark] class ApplicationMaster( // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) - // If the credentials file config is present, we must periodically renew tokens. So create - // a new AMDelegationTokenRenewer - if (sparkConf.contains("spark.yarn.credentials.file")) { - delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf)) - // If a principal and keytab have been set, use that to create new credentials for executors - // periodically - delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab()) - } - if (isClusterMode) { runDriver(securityMgr) } else { @@ -200,7 +189,6 @@ private[spark] class ApplicationMaster( logDebug("shutting down user thread") userClassThread.interrupt() } - if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop()) } } } @@ -247,12 +235,12 @@ private[spark] class ApplicationMaster( host: String, port: String, isClusterMode: Boolean): Unit = { - val driverEndpoint = rpcEnv.setupEndpointRef( + val driverEndpont = rpcEnv.setupEndpointRef( SparkEnv.driverActorSystemName, RpcAddress(host, port.toInt), YarnSchedulerBackend.ENDPOINT_NAME) amEndpoint = - rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode)) + rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, isClusterMode)) } private def runDriver(securityMgr: SecurityManager): Unit = { @@ -506,7 +494,6 @@ private[spark] class ApplicationMaster( override def onStart(): Unit = { driver.send(RegisterClusterManager(self)) - } override def receive: PartialFunction[Any, Unit] = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index bb126a4637..4abcf7307a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,11 +17,9 @@ package org.apache.spark.deploy.yarn -import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} +import java.io.{File, FileOutputStream} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer -import java.security.PrivilegedExceptionAction -import java.util.UUID import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConversions._ @@ -38,6 +36,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{TokenIdentifier, Token} @@ -51,8 +50,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils private[spark] class Client( @@ -70,13 +69,11 @@ private[spark] class Client( private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) - private var credentials: Credentials = null + private val credentials = UserGroupInformation.getCurrentUser.getCredentials private val amMemoryOverhead = args.amMemoryOverhead // MB private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode - - private var loginFromKeytab = false private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) @@ -91,8 +88,6 @@ private[spark] class Client( * available in the alpha API. */ def submitApplication(): ApplicationId = { - // Setup the credentials before doing anything else, so we have don't have issues at any point. - setupCredentials() yarnClient.init(yarnConf) yarnClient.start() @@ -224,12 +219,12 @@ private[spark] class Client( // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst - YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) + val nns = getNameNodesToAccess(sparkConf) + dst // Used to keep track of URIs added to the distributed cache. If the same URI is added // multiple times, YARN will fail to launch containers for the app with an internal // error. val distributedUris = new HashSet[String] + obtainTokensForNamenodes(nns, hadoopConf, credentials) obtainTokenForHiveMetastore(hadoopConf, credentials) obtainTokenForHBase(hadoopConf, credentials) @@ -248,20 +243,6 @@ private[spark] class Client( "for alternatives.") } - // If we passed in a keytab, make sure we copy the keytab to the staging directory on - // HDFS, and setup the relevant environment vars, so the AM can login again. - if (loginFromKeytab) { - logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + - " via the YARN Secure Distributed Cache.") - val localUri = new URI(args.keytab) - val localPath = getQualifiedLocalPath(localUri, hadoopConf) - val destinationPath = copyFileToRemote(dst, localPath, replication) - val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf) - distCacheMgr.addResource( - destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE, - sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true) - } - def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { @@ -405,28 +386,6 @@ private[spark] class Client( } /** - * Get the renewal interval for tokens. - */ - private def getTokenRenewalInterval(stagingDirPath: Path): Long = { - // We cannot use the tokens generated above since those have renewer yarn. Trying to renew - // those will fail with an access control issue. So create new tokens with the logged in - // user as renewer. - val creds = new Credentials() - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath - YarnSparkHadoopUtil.get.obtainTokensForNamenodes( - nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal"))) - val t = creds.getAllTokens - .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - .head - val newExpiration = t.renew(hadoopConf) - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal Interval set to $interval") - interval - } - - /** * Set up the environment for launching our ApplicationMaster container. */ private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = { @@ -437,16 +396,7 @@ private[spark] class Client( env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - if (loginFromKeytab) { - val remoteFs = FileSystem.get(hadoopConf) - val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir) - val credentialsFile = "credentials-" + UUID.randomUUID().toString - sparkConf.set( - "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString) - logInfo(s"Credentials file set to: $credentialsFile") - val renewalInterval = getTokenRenewalInterval(stagingDirPath) - sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString) - } + // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) @@ -511,6 +461,7 @@ private[spark] class Client( private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") + val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) @@ -681,24 +632,6 @@ private[spark] class Client( amContainer } - def setupCredentials(): Unit = { - if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when principal is specified.") - logInfo("Attempting to login to the Kerberos" + - s" using principal: ${args.principal} and keytab: ${args.keytab}") - val f = new File(args.keytab) - // Generate a file name that can be used for the keytab file, that does not conflict - // with any user file. - val keytabFileName = f.getName + "-" + UUID.randomUUID().toString - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - loginFromKeytab = true - sparkConf.set("spark.yarn.keytab", keytabFileName) - sparkConf.set("spark.yarn.principal", args.principal) - logInfo("Successfully logged into the KDC.") - } - credentials = UserGroupInformation.getCurrentUser.getCredentials - } - /** * Report the state of an application until it has exited, either successfully or * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, @@ -1055,6 +988,46 @@ object Client extends Logging { YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) /** + * Get the list of namenodes the user may access. + */ + private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { + sparkConf.get("spark.yarn.access.namenodes", "") + .split(",") + .map(_.trim()) + .filter(!_.isEmpty) + .map(new Path(_)) + .toSet + } + + private[yarn] def getTokenRenewer(conf: Configuration): String = { + val delegTokenRenewer = Master.getMasterPrincipal(conf) + logDebug("delegation token renewer is: " + delegTokenRenewer) + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new SparkException(errorMessage) + } + delegTokenRenewer + } + + /** + * Obtains tokens for the namenodes passed in and adds them to the credentials. + */ + private def obtainTokensForNamenodes( + paths: Set[Path], + conf: Configuration, + creds: Credentials): Unit = { + if (UserGroupInformation.isSecurityEnabled()) { + val delegTokenRenewer = getTokenRenewer(conf) + paths.foreach { dst => + val dstFs = dst.getFileSystem(conf) + logDebug("getting token for namenode: " + dst) + dstFs.addDelegationTokens(delegTokenRenewer, creds) + } + } + } + + /** * Obtains token for the Hive metastore and adds them to the credentials. */ private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 5653c9f14d..1423533470 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -42,8 +42,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var amCores: Int = 1 var appName: String = "Spark" var priority = 0 - var principal: String = null - var keytab: String = null def isClusterMode: Boolean = userClass != null private var driverMemory: Int = 512 // MB @@ -233,14 +231,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) archives = value args = tail - case ("--principal") :: value :: tail => - principal = value - args = tail - - case ("--keytab") :: value :: tail => - keytab = value - args = tail - case Nil => case _ => diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a5c454e2e5..5881dc5ffa 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,26 +17,25 @@ package org.apache.spark.deploy.yarn -import java.io._ +import java.io.File import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.HashMap import scala.util.Try -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.{Master, JobConf} +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.util.Utils /** @@ -83,48 +82,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { if (credentials != null) credentials.getSecretKey(new Text(key)) else null } - /** - * Get the list of namenodes the user may access. - */ - def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { - sparkConf.get("spark.yarn.access.namenodes", "") - .split(",") - .map(_.trim()) - .filter(!_.isEmpty) - .map(new Path(_)) - .toSet - } - - def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - delegTokenRenewer - } - - /** - * Obtains tokens for the namenodes passed in and adds them to the credentials. - */ - def obtainTokensForNamenodes( - paths: Set[Path], - conf: Configuration, - creds: Credentials, - renewer: Option[String] = None - ): Unit = { - if (UserGroupInformation.isSecurityEnabled()) { - val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf)) - paths.foreach { dst => - val dstFs = dst.getFileSystem(conf) - logInfo("getting token for namenode: " + dst) - dstFs.addDelegationTokens(delegTokenRenewer, creds) - } - } - } - } object YarnSparkHadoopUtil { @@ -143,14 +100,6 @@ object YarnSparkHadoopUtil { // request types (like map/reduce in hadoop for example) val RM_REQUEST_PRIORITY = Priority.newInstance(1) - def get: YarnSparkHadoopUtil = { - val yarnMode = java.lang.Boolean.valueOf( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if (!yarnMode) { - throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!") - } - SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] - } /** * Add a path variable to the given environment map. * If the map already contains this key, append the value to the existing value instead. @@ -263,4 +212,3 @@ object YarnSparkHadoopUtil { classPathSeparatorField.get(null).asInstanceOf[String] } } - diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 508819e242..a51c2005cb 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -151,6 +151,57 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { } } + test("check access nns empty") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "") + val nns = Client.getNameNodesToAccess(sparkConf) + nns should be(Set()) + } + + test("check access nns unset") { + val sparkConf = new SparkConf() + val nns = Client.getNameNodesToAccess(sparkConf) + nns should be(Set()) + } + + test("check access nns") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032") + val nns = Client.getNameNodesToAccess(sparkConf) + nns should be(Set(new Path("hdfs://nn1:8032"))) + } + + test("check access nns space") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ") + val nns = Client.getNameNodesToAccess(sparkConf) + nns should be(Set(new Path("hdfs://nn1:8032"))) + } + + test("check access two nns") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032") + val nns = Client.getNameNodesToAccess(sparkConf) + nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032"))) + } + + test("check token renewer") { + val hadoopConf = new Configuration() + hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") + hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM") + val renewer = Client.getTokenRenewer(hadoopConf) + renewer should be ("yarn/myrm:8032@SPARKTEST.COM") + } + + test("check token renewer default") { + val hadoopConf = new Configuration() + val caught = + intercept[SparkException] { + Client.getTokenRenewer(hadoopConf) + } + assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d3c606e0ed..3877da4120 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -86,7 +86,6 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit tempDir = Utils.createTempDir() logConfDir = new File(tempDir, "log4j") logConfDir.mkdir() - System.setProperty("SPARK_YARN_MODE", "true") val logConfFile = new File(logConfDir, "log4j.properties") Files.write(LOG4J_CONF, logConfFile, UTF_8) @@ -129,7 +128,6 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit override def afterAll() { yarnCluster.stop() - System.clearProperty("SPARK_YARN_MODE") super.afterAll() } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index e10b985c3c..9395316b71 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -20,8 +20,6 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -29,7 +27,7 @@ import org.scalatest.{FunSuite, Matchers} import org.apache.hadoop.yarn.api.records.ApplicationAccessType -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils @@ -175,62 +173,4 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { YarnSparkHadoopUtil.getClassPathSeparator() should be (":") } } - - test("check access nns empty") { - val sparkConf = new SparkConf() - val util = new YarnSparkHadoopUtil - sparkConf.set("spark.yarn.access.namenodes", "") - val nns = util.getNameNodesToAccess(sparkConf) - nns should be(Set()) - } - - test("check access nns unset") { - val sparkConf = new SparkConf() - val util = new YarnSparkHadoopUtil - val nns = util.getNameNodesToAccess(sparkConf) - nns should be(Set()) - } - - test("check access nns") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032") - val util = new YarnSparkHadoopUtil - val nns = util.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"))) - } - - test("check access nns space") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ") - val util = new YarnSparkHadoopUtil - val nns = util.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"))) - } - - test("check access two nns") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032") - val util = new YarnSparkHadoopUtil - val nns = util.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032"))) - } - - test("check token renewer") { - val hadoopConf = new Configuration() - hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") - hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM") - val util = new YarnSparkHadoopUtil - val renewer = util.getTokenRenewer(hadoopConf) - renewer should be ("yarn/myrm:8032@SPARKTEST.COM") - } - - test("check token renewer default") { - val hadoopConf = new Configuration() - val util = new YarnSparkHadoopUtil - val caught = - intercept[SparkException] { - util.getTokenRenewer(hadoopConf) - } - assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") - } } |