aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala203
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala123
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala60
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala51
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala62
8 files changed, 422 insertions, 106 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
new file mode 100644
index 0000000000..9ff02046de
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.deploy.SparkHadoopUtil
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.ThreadUtils
+
+/*
+ * The following methods are primarily meant to make sure long-running apps like Spark
+ * Streaming apps can run without interruption while writing to secure HDFS. The
+ * scheduleLoginFromKeytab method is called on the driver when the
+ * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
+ * once 75% of the renewal interval of the original delegation tokens used for the container
+ * has elapsed. It then creates new delegation tokens and writes them to HDFS in a
+ * pre-specified location - the prefix of which is specified in the sparkConf by
+ * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
+ * to a new file, with a monotonically increasing suffix). After this, the credentials are
+ * updated once 75% of the new tokens renewal interval has elapsed.
+ *
+ * On the executor side, the updateCredentialsIfRequired method is called once 80% of the
+ * validity of the original tokens has elapsed. At that time the executor finds the
+ * credentials file with the latest timestamp and checks if it has read those credentials
+ * before (by keeping track of the suffix of the last file it read). If a new file has
+ * appeared, it will read the credentials and update the currently running UGI with it. This
+ * process happens again once 80% of the validity of this has expired.
+ */
+private[yarn] class AMDelegationTokenRenewer(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration) extends Logging {
+
+ private var lastCredentialsFileSuffix = 0
+
+ private val delegationTokenRenewer =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+
+ private val hadoopUtil = YarnSparkHadoopUtil.get
+
+ private val daysToKeepFiles = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
+ private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+
+ /**
+ * Schedule a login from the keytab and principal set using the --principal and --keytab
+ * arguments to spark-submit. This login happens only when the credentials of the current user
+ * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
+ * SparkConf to do the login. This method is a no-op in non-YARN mode.
+ *
+ */
+ private[spark] def scheduleLoginFromKeytab(): Unit = {
+ val principal = sparkConf.get("spark.yarn.principal")
+ val keytab = sparkConf.get("spark.yarn.keytab")
+
+ /**
+ * Schedule re-login and creation of new tokens. If tokens have already expired, this method
+ * will synchronously create new ones.
+ */
+ def scheduleRenewal(runnable: Runnable): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
+ // Run now!
+ if (renewalInterval <= 0) {
+ logInfo("HDFS tokens have expired, creating new tokens now.")
+ runnable.run()
+ } else {
+ logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
+ delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ // This thread periodically runs on the driver to update the delegation tokens on HDFS.
+ val driverTokenRenewerRunnable =
+ new Runnable {
+ override def run(): Unit = {
+ try {
+ writeNewTokensToHDFS(principal, keytab)
+ cleanupOldFiles()
+ } catch {
+ case e: Exception =>
+ // Log the error and try to write new tokens back in an hour
+ logWarning("Failed to write out new credentials to HDFS, will try again in an " +
+ "hour! If this happens too often tasks will fail.", e)
+ delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
+ return
+ }
+ scheduleRenewal(this)
+ }
+ }
+ // Schedule update of credentials. This handles the case of updating the tokens right now
+ // as well, since the renenwal interval will be 0, and the thread will get scheduled
+ // immediately.
+ scheduleRenewal(driverTokenRenewerRunnable)
+ }
+
+ // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
+ // least numFilesToKeep files are kept for safety
+ private def cleanupOldFiles(): Unit = {
+ import scala.concurrent.duration._
+ try {
+ val remoteFs = FileSystem.get(hadoopConf)
+ val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
+ hadoopUtil.listFilesSorted(
+ remoteFs, credentialsPath.getParent,
+ credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .dropRight(numFilesToKeep)
+ .takeWhile(_.getModificationTime < thresholdTime)
+ .foreach(x => remoteFs.delete(x.getPath, true))
+ } catch {
+ // Such errors are not fatal, so don't throw. Make sure they are logged though
+ case e: Exception =>
+ logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " +
+ "warnings there may be an issue with your HDFS cluster.")
+ }
+ }
+
+ private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
+ // Keytab is copied by YARN to the working directory of the AM, so full path is
+ // not needed.
+
+ // HACK:
+ // HDFS will not issue new delegation tokens, if the Credentials object
+ // passed in already has tokens for that FS even if the tokens are expired (it really only
+ // checks if there are tokens for the service, and not if they are valid). So the only real
+ // way to get new tokens is to make sure a different Credentials object is used each time to
+ // get new tokens and then the new tokens are copied over the the current user's Credentials.
+ // So:
+ // - we login as a different user and get the UGI
+ // - use that UGI to get the tokens (see doAs block below)
+ // - copy the tokens over to the current user's credentials (this will overwrite the tokens
+ // in the current user's Credentials object for this FS).
+ // The login to KDC happens each time new tokens are required, but this is rare enough to not
+ // have to worry about (like once every day or so). This makes this code clearer than having
+ // to login and then relogin every time (the HDFS API may not relogin since we don't use this
+ // UGI directly for HDFS communication.
+ logInfo(s"Attempting to login to KDC using principal: $principal")
+ val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+ logInfo("Successfully logged into KDC.")
+ val tempCreds = keytabLoggedInUGI.getCredentials
+ val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val dst = credentialsPath.getParent
+ keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
+ // Get a copy of the credentials
+ override def run(): Void = {
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
+ null
+ }
+ })
+ // Add the temp credentials back to the original ones.
+ UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+ val remoteFs = FileSystem.get(hadoopConf)
+ // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
+ // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
+ // and update the lastCredentialsFileSuffix.
+ if (lastCredentialsFileSuffix == 0) {
+ hadoopUtil.listFilesSorted(
+ remoteFs, credentialsPath.getParent,
+ credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.foreach { status =>
+ lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
+ }
+ }
+ val nextSuffix = lastCredentialsFileSuffix + 1
+ val tokenPathStr =
+ sparkConf.get("spark.yarn.credentials.file") +
+ SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
+ val tokenPath = new Path(tokenPathStr)
+ val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
+ logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
+ remoteFs.rename(tempTokenPath, tokenPath)
+ logInfo("Delegation token file rename complete.")
+ lastCredentialsFileSuffix = nextSuffix
+ }
+
+ def stop(): Unit = {
+ delegationTokenRenewer.shutdown()
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 70cb57ffd8..82f2b7e1dd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -75,6 +75,8 @@ private[spark] class ApplicationMaster(
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
+ private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+
final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
@@ -125,6 +127,15 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
+ // If the credentials file config is present, we must periodically renew tokens. So create
+ // a new AMDelegationTokenRenewer
+ if (sparkConf.contains("spark.yarn.credentials.file")) {
+ delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
+ // If a principal and keytab have been set, use that to create new credentials for executors
+ // periodically
+ delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
+ }
+
if (isClusterMode) {
runDriver(securityMgr)
} else {
@@ -189,6 +200,7 @@ private[spark] class ApplicationMaster(
logDebug("shutting down user thread")
userClassThread.interrupt()
}
+ if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
}
}
}
@@ -235,12 +247,12 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
- val driverEndpont = rpcEnv.setupEndpointRef(
+ val driverEndpoint = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
- rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, isClusterMode))
+ rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -494,6 +506,7 @@ private[spark] class ApplicationMaster(
override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
+
}
override def receive: PartialFunction[Any, Unit] = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4abcf7307a..bb126a4637 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,9 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.{File, FileOutputStream}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
@@ -36,7 +38,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{TokenIdentifier, Token}
@@ -50,8 +51,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -69,11 +70,13 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
- private val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ private var credentials: Credentials = null
private val amMemoryOverhead = args.amMemoryOverhead // MB
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
+
+ private var loginFromKeytab = false
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
@@ -88,6 +91,8 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
+ // Setup the credentials before doing anything else, so we have don't have issues at any point.
+ setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
@@ -219,12 +224,12 @@ private[spark] class Client(
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val nns = getNameNodesToAccess(sparkConf) + dst
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
- obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
obtainTokenForHBase(hadoopConf, credentials)
@@ -243,6 +248,20 @@ private[spark] class Client(
"for alternatives.")
}
+ // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+ // HDFS, and setup the relevant environment vars, so the AM can login again.
+ if (loginFromKeytab) {
+ logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+ " via the YARN Secure Distributed Cache.")
+ val localUri = new URI(args.keytab)
+ val localPath = getQualifiedLocalPath(localUri, hadoopConf)
+ val destinationPath = copyFileToRemote(dst, localPath, replication)
+ val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(
+ destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
+ sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
+ }
+
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -386,6 +405,28 @@ private[spark] class Client(
}
/**
+ * Get the renewal interval for tokens.
+ */
+ private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
+ // We cannot use the tokens generated above since those have renewer yarn. Trying to renew
+ // those will fail with an access control issue. So create new tokens with the logged in
+ // user as renewer.
+ val creds = new Credentials()
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
+ YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
+ nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
+ val t = creds.getAllTokens
+ .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .head
+ val newExpiration = t.renew(hadoopConf)
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ val interval = newExpiration - identifier.getIssueDate
+ logInfo(s"Renewal Interval set to $interval")
+ interval
+ }
+
+ /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
@@ -396,7 +437,16 @@ private[spark] class Client(
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
-
+ if (loginFromKeytab) {
+ val remoteFs = FileSystem.get(hadoopConf)
+ val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
+ val credentialsFile = "credentials-" + UUID.randomUUID().toString
+ sparkConf.set(
+ "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
+ logInfo(s"Credentials file set to: $credentialsFile")
+ val renewalInterval = getTokenRenewalInterval(stagingDirPath)
+ sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
+ }
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -461,7 +511,6 @@ private[spark] class Client(
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
-
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
@@ -632,6 +681,24 @@ private[spark] class Client(
amContainer
}
+ def setupCredentials(): Unit = {
+ if (args.principal != null) {
+ require(args.keytab != null, "Keytab must be specified when principal is specified.")
+ logInfo("Attempting to login to the Kerberos" +
+ s" using principal: ${args.principal} and keytab: ${args.keytab}")
+ val f = new File(args.keytab)
+ // Generate a file name that can be used for the keytab file, that does not conflict
+ // with any user file.
+ val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ loginFromKeytab = true
+ sparkConf.set("spark.yarn.keytab", keytabFileName)
+ sparkConf.set("spark.yarn.principal", args.principal)
+ logInfo("Successfully logged into the KDC.")
+ }
+ credentials = UserGroupInformation.getCurrentUser.getCredentials
+ }
+
/**
* Report the state of an application until it has exited, either successfully or
* due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
@@ -988,46 +1055,6 @@ object Client extends Logging {
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
- * Get the list of namenodes the user may access.
- */
- private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get("spark.yarn.access.namenodes", "")
- .split(",")
- .map(_.trim())
- .filter(!_.isEmpty)
- .map(new Path(_))
- .toSet
- }
-
- private[yarn] def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
- delegTokenRenewer
- }
-
- /**
- * Obtains tokens for the namenodes passed in and adds them to the credentials.
- */
- private def obtainTokensForNamenodes(
- paths: Set[Path],
- conf: Configuration,
- creds: Credentials): Unit = {
- if (UserGroupInformation.isSecurityEnabled()) {
- val delegTokenRenewer = getTokenRenewer(conf)
- paths.foreach { dst =>
- val dstFs = dst.getFileSystem(conf)
- logDebug("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(delegTokenRenewer, creds)
- }
- }
- }
-
- /**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1423533470..5653c9f14d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -42,6 +42,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
+ var principal: String = null
+ var keytab: String = null
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
@@ -231,6 +233,14 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
archives = value
args = tail
+ case ("--principal") :: value :: tail =>
+ principal = value
+ args = tail
+
+ case ("--keytab") :: value :: tail =>
+ keytab = value
+ args = tail
+
case Nil =>
case _ =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5881dc5ffa..a5c454e2e5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,25 +17,26 @@
package org.apache.spark.deploy.yarn
-import java.io.File
+import java.io._
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.HashMap
import scala.util.Try
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
/**
@@ -82,6 +83,48 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
if (credentials != null) credentials.getSecretKey(new Text(key)) else null
}
+ /**
+ * Get the list of namenodes the user may access.
+ */
+ def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get("spark.yarn.access.namenodes", "")
+ .split(",")
+ .map(_.trim())
+ .filter(!_.isEmpty)
+ .map(new Path(_))
+ .toSet
+ }
+
+ def getTokenRenewer(conf: Configuration): String = {
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ logDebug("delegation token renewer is: " + delegTokenRenewer)
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
+ }
+ delegTokenRenewer
+ }
+
+ /**
+ * Obtains tokens for the namenodes passed in and adds them to the credentials.
+ */
+ def obtainTokensForNamenodes(
+ paths: Set[Path],
+ conf: Configuration,
+ creds: Credentials,
+ renewer: Option[String] = None
+ ): Unit = {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
+ paths.foreach { dst =>
+ val dstFs = dst.getFileSystem(conf)
+ logInfo("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ }
+ }
+ }
+
}
object YarnSparkHadoopUtil {
@@ -100,6 +143,14 @@ object YarnSparkHadoopUtil {
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
+ def get: YarnSparkHadoopUtil = {
+ val yarnMode = java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if (!yarnMode) {
+ throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!")
+ }
+ SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
+ }
/**
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.
@@ -212,3 +263,4 @@ object YarnSparkHadoopUtil {
classPathSeparatorField.get(null).asInstanceOf[String]
}
}
+
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index a51c2005cb..508819e242 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,57 +151,6 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
}
}
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = Client.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val caught =
- intercept[SparkException] {
- Client.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 3877da4120..d3c606e0ed 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -86,6 +86,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
tempDir = Utils.createTempDir()
logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
+ System.setProperty("SPARK_YARN_MODE", "true")
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, UTF_8)
@@ -128,6 +129,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
override def afterAll() {
yarnCluster.stop()
+ System.clearProperty("SPARK_YARN_MODE")
super.afterAll()
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 9395316b71..e10b985c3c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -27,7 +29,7 @@ import org.scalatest.{FunSuite, Matchers}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
@@ -173,4 +175,62 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
}
}
+
+ test("check access nns empty") {
+ val sparkConf = new SparkConf()
+ val util = new YarnSparkHadoopUtil
+ sparkConf.set("spark.yarn.access.namenodes", "")
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns unset") {
+ val sparkConf = new SparkConf()
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access nns space") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access two nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val util = new YarnSparkHadoopUtil
+ val renewer = util.getTokenRenewer(hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val util = new YarnSparkHadoopUtil
+ val caught =
+ intercept[SparkException] {
+ util.getTokenRenewer(hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
}