aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-04-30 13:03:23 -0500
committerThomas Graves <tgraves@apache.org>2015-04-30 13:03:23 -0500
commit6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c (patch)
tree662e09f980e9eeaff8bd572537f9c2549c193337 /yarn
parent7dacc08ab36188991a001df23880167433844767 (diff)
downloadspark-6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.tar.gz
spark-6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.tar.bz2
spark-6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.zip
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS
Current Spark apps running on Secure YARN/HDFS would not be able to write data to HDFS after 7 days, since delegation tokens cannot be renewed beyond that. This means Spark Streaming apps will not be able to run on Secure YARN. This commit adds basic functionality to fix this issue. In this patch: - new parameters are added - principal and keytab, which can be used to login to a KDC - the client logs in, and then get tokens to start the AM - the keytab is copied to the staging directory - the AM waits for 60% of the time till expiry of the tokens and then logs in using the keytab - each time after 60% of the time, new tokens are created and sent to the executors Currently, to avoid complicating the architecture, we set the keytab and principal in the SparkHadoopUtil singleton, and schedule a login. Once the login is completed, a callback is scheduled. This is being posted for feedback, so I can gather feedback on the general implementation. There are currently a bunch of things to do: - [x] logging - [x] testing - I plan to manually test this soon. If you have ideas of how to add unit tests, comment. - [x] add code to ensure that if these params are set in non-YARN cluster mode, we complain - [x] documentation - [x] Have the executors request for credentials from the AM, so that retries are possible. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #4688 from harishreedharan/kerberos-longrunning and squashes the following commits: 36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments. 611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens. 09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml 6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to. 072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils. f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens. ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates. 7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required. e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message. 0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 7f1bc58 [Hari Shreedharan] Minor fixes, cleanup. bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup. f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files. 2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted. 61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS. 62c45ce [Hari Shreedharan] Relogin from keytab periodically. fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues. 42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master. 0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity. 9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes. f4fd711 [Hari Shreedharan] Fix SparkConf usage. 2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS. af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required. f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file. f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials. 5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues. b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS 0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire. d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens() 8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object. fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start() 41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests. bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil. f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled. 2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS. ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos 77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala203
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala123
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala60
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala51
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala62
8 files changed, 422 insertions, 106 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
new file mode 100644
index 0000000000..9ff02046de
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.deploy.SparkHadoopUtil
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.ThreadUtils
+
+/*
+ * The following methods are primarily meant to make sure long-running apps like Spark
+ * Streaming apps can run without interruption while writing to secure HDFS. The
+ * scheduleLoginFromKeytab method is called on the driver when the
+ * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
+ * once 75% of the renewal interval of the original delegation tokens used for the container
+ * has elapsed. It then creates new delegation tokens and writes them to HDFS in a
+ * pre-specified location - the prefix of which is specified in the sparkConf by
+ * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
+ * to a new file, with a monotonically increasing suffix). After this, the credentials are
+ * updated once 75% of the new tokens renewal interval has elapsed.
+ *
+ * On the executor side, the updateCredentialsIfRequired method is called once 80% of the
+ * validity of the original tokens has elapsed. At that time the executor finds the
+ * credentials file with the latest timestamp and checks if it has read those credentials
+ * before (by keeping track of the suffix of the last file it read). If a new file has
+ * appeared, it will read the credentials and update the currently running UGI with it. This
+ * process happens again once 80% of the validity of this has expired.
+ */
+private[yarn] class AMDelegationTokenRenewer(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration) extends Logging {
+
+ private var lastCredentialsFileSuffix = 0
+
+ private val delegationTokenRenewer =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+
+ private val hadoopUtil = YarnSparkHadoopUtil.get
+
+ private val daysToKeepFiles = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
+ private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+
+ /**
+ * Schedule a login from the keytab and principal set using the --principal and --keytab
+ * arguments to spark-submit. This login happens only when the credentials of the current user
+ * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
+ * SparkConf to do the login. This method is a no-op in non-YARN mode.
+ *
+ */
+ private[spark] def scheduleLoginFromKeytab(): Unit = {
+ val principal = sparkConf.get("spark.yarn.principal")
+ val keytab = sparkConf.get("spark.yarn.keytab")
+
+ /**
+ * Schedule re-login and creation of new tokens. If tokens have already expired, this method
+ * will synchronously create new ones.
+ */
+ def scheduleRenewal(runnable: Runnable): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
+ // Run now!
+ if (renewalInterval <= 0) {
+ logInfo("HDFS tokens have expired, creating new tokens now.")
+ runnable.run()
+ } else {
+ logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
+ delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ // This thread periodically runs on the driver to update the delegation tokens on HDFS.
+ val driverTokenRenewerRunnable =
+ new Runnable {
+ override def run(): Unit = {
+ try {
+ writeNewTokensToHDFS(principal, keytab)
+ cleanupOldFiles()
+ } catch {
+ case e: Exception =>
+ // Log the error and try to write new tokens back in an hour
+ logWarning("Failed to write out new credentials to HDFS, will try again in an " +
+ "hour! If this happens too often tasks will fail.", e)
+ delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
+ return
+ }
+ scheduleRenewal(this)
+ }
+ }
+ // Schedule update of credentials. This handles the case of updating the tokens right now
+ // as well, since the renenwal interval will be 0, and the thread will get scheduled
+ // immediately.
+ scheduleRenewal(driverTokenRenewerRunnable)
+ }
+
+ // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
+ // least numFilesToKeep files are kept for safety
+ private def cleanupOldFiles(): Unit = {
+ import scala.concurrent.duration._
+ try {
+ val remoteFs = FileSystem.get(hadoopConf)
+ val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
+ hadoopUtil.listFilesSorted(
+ remoteFs, credentialsPath.getParent,
+ credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .dropRight(numFilesToKeep)
+ .takeWhile(_.getModificationTime < thresholdTime)
+ .foreach(x => remoteFs.delete(x.getPath, true))
+ } catch {
+ // Such errors are not fatal, so don't throw. Make sure they are logged though
+ case e: Exception =>
+ logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " +
+ "warnings there may be an issue with your HDFS cluster.")
+ }
+ }
+
+ private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
+ // Keytab is copied by YARN to the working directory of the AM, so full path is
+ // not needed.
+
+ // HACK:
+ // HDFS will not issue new delegation tokens, if the Credentials object
+ // passed in already has tokens for that FS even if the tokens are expired (it really only
+ // checks if there are tokens for the service, and not if they are valid). So the only real
+ // way to get new tokens is to make sure a different Credentials object is used each time to
+ // get new tokens and then the new tokens are copied over the the current user's Credentials.
+ // So:
+ // - we login as a different user and get the UGI
+ // - use that UGI to get the tokens (see doAs block below)
+ // - copy the tokens over to the current user's credentials (this will overwrite the tokens
+ // in the current user's Credentials object for this FS).
+ // The login to KDC happens each time new tokens are required, but this is rare enough to not
+ // have to worry about (like once every day or so). This makes this code clearer than having
+ // to login and then relogin every time (the HDFS API may not relogin since we don't use this
+ // UGI directly for HDFS communication.
+ logInfo(s"Attempting to login to KDC using principal: $principal")
+ val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+ logInfo("Successfully logged into KDC.")
+ val tempCreds = keytabLoggedInUGI.getCredentials
+ val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val dst = credentialsPath.getParent
+ keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
+ // Get a copy of the credentials
+ override def run(): Void = {
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
+ null
+ }
+ })
+ // Add the temp credentials back to the original ones.
+ UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+ val remoteFs = FileSystem.get(hadoopConf)
+ // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
+ // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
+ // and update the lastCredentialsFileSuffix.
+ if (lastCredentialsFileSuffix == 0) {
+ hadoopUtil.listFilesSorted(
+ remoteFs, credentialsPath.getParent,
+ credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.foreach { status =>
+ lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
+ }
+ }
+ val nextSuffix = lastCredentialsFileSuffix + 1
+ val tokenPathStr =
+ sparkConf.get("spark.yarn.credentials.file") +
+ SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
+ val tokenPath = new Path(tokenPathStr)
+ val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
+ logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
+ remoteFs.rename(tempTokenPath, tokenPath)
+ logInfo("Delegation token file rename complete.")
+ lastCredentialsFileSuffix = nextSuffix
+ }
+
+ def stop(): Unit = {
+ delegationTokenRenewer.shutdown()
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 70cb57ffd8..82f2b7e1dd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -75,6 +75,8 @@ private[spark] class ApplicationMaster(
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
+ private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+
final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
@@ -125,6 +127,15 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
+ // If the credentials file config is present, we must periodically renew tokens. So create
+ // a new AMDelegationTokenRenewer
+ if (sparkConf.contains("spark.yarn.credentials.file")) {
+ delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
+ // If a principal and keytab have been set, use that to create new credentials for executors
+ // periodically
+ delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
+ }
+
if (isClusterMode) {
runDriver(securityMgr)
} else {
@@ -189,6 +200,7 @@ private[spark] class ApplicationMaster(
logDebug("shutting down user thread")
userClassThread.interrupt()
}
+ if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
}
}
}
@@ -235,12 +247,12 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
- val driverEndpont = rpcEnv.setupEndpointRef(
+ val driverEndpoint = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
- rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, isClusterMode))
+ rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -494,6 +506,7 @@ private[spark] class ApplicationMaster(
override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
+
}
override def receive: PartialFunction[Any, Unit] = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4abcf7307a..bb126a4637 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,9 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.{File, FileOutputStream}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
@@ -36,7 +38,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{TokenIdentifier, Token}
@@ -50,8 +51,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -69,11 +70,13 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
- private val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ private var credentials: Credentials = null
private val amMemoryOverhead = args.amMemoryOverhead // MB
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
+
+ private var loginFromKeytab = false
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
@@ -88,6 +91,8 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
+ // Setup the credentials before doing anything else, so we have don't have issues at any point.
+ setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
@@ -219,12 +224,12 @@ private[spark] class Client(
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val nns = getNameNodesToAccess(sparkConf) + dst
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
- obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
obtainTokenForHBase(hadoopConf, credentials)
@@ -243,6 +248,20 @@ private[spark] class Client(
"for alternatives.")
}
+ // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+ // HDFS, and setup the relevant environment vars, so the AM can login again.
+ if (loginFromKeytab) {
+ logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+ " via the YARN Secure Distributed Cache.")
+ val localUri = new URI(args.keytab)
+ val localPath = getQualifiedLocalPath(localUri, hadoopConf)
+ val destinationPath = copyFileToRemote(dst, localPath, replication)
+ val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(
+ destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
+ sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
+ }
+
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -386,6 +405,28 @@ private[spark] class Client(
}
/**
+ * Get the renewal interval for tokens.
+ */
+ private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
+ // We cannot use the tokens generated above since those have renewer yarn. Trying to renew
+ // those will fail with an access control issue. So create new tokens with the logged in
+ // user as renewer.
+ val creds = new Credentials()
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
+ YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
+ nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
+ val t = creds.getAllTokens
+ .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .head
+ val newExpiration = t.renew(hadoopConf)
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ val interval = newExpiration - identifier.getIssueDate
+ logInfo(s"Renewal Interval set to $interval")
+ interval
+ }
+
+ /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
@@ -396,7 +437,16 @@ private[spark] class Client(
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
-
+ if (loginFromKeytab) {
+ val remoteFs = FileSystem.get(hadoopConf)
+ val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
+ val credentialsFile = "credentials-" + UUID.randomUUID().toString
+ sparkConf.set(
+ "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
+ logInfo(s"Credentials file set to: $credentialsFile")
+ val renewalInterval = getTokenRenewalInterval(stagingDirPath)
+ sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
+ }
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -461,7 +511,6 @@ private[spark] class Client(
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
-
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
@@ -632,6 +681,24 @@ private[spark] class Client(
amContainer
}
+ def setupCredentials(): Unit = {
+ if (args.principal != null) {
+ require(args.keytab != null, "Keytab must be specified when principal is specified.")
+ logInfo("Attempting to login to the Kerberos" +
+ s" using principal: ${args.principal} and keytab: ${args.keytab}")
+ val f = new File(args.keytab)
+ // Generate a file name that can be used for the keytab file, that does not conflict
+ // with any user file.
+ val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ loginFromKeytab = true
+ sparkConf.set("spark.yarn.keytab", keytabFileName)
+ sparkConf.set("spark.yarn.principal", args.principal)
+ logInfo("Successfully logged into the KDC.")
+ }
+ credentials = UserGroupInformation.getCurrentUser.getCredentials
+ }
+
/**
* Report the state of an application until it has exited, either successfully or
* due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
@@ -988,46 +1055,6 @@ object Client extends Logging {
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
- * Get the list of namenodes the user may access.
- */
- private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get("spark.yarn.access.namenodes", "")
- .split(",")
- .map(_.trim())
- .filter(!_.isEmpty)
- .map(new Path(_))
- .toSet
- }
-
- private[yarn] def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
- delegTokenRenewer
- }
-
- /**
- * Obtains tokens for the namenodes passed in and adds them to the credentials.
- */
- private def obtainTokensForNamenodes(
- paths: Set[Path],
- conf: Configuration,
- creds: Credentials): Unit = {
- if (UserGroupInformation.isSecurityEnabled()) {
- val delegTokenRenewer = getTokenRenewer(conf)
- paths.foreach { dst =>
- val dstFs = dst.getFileSystem(conf)
- logDebug("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(delegTokenRenewer, creds)
- }
- }
- }
-
- /**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1423533470..5653c9f14d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -42,6 +42,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
+ var principal: String = null
+ var keytab: String = null
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
@@ -231,6 +233,14 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
archives = value
args = tail
+ case ("--principal") :: value :: tail =>
+ principal = value
+ args = tail
+
+ case ("--keytab") :: value :: tail =>
+ keytab = value
+ args = tail
+
case Nil =>
case _ =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5881dc5ffa..a5c454e2e5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,25 +17,26 @@
package org.apache.spark.deploy.yarn
-import java.io.File
+import java.io._
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.HashMap
import scala.util.Try
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
/**
@@ -82,6 +83,48 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
if (credentials != null) credentials.getSecretKey(new Text(key)) else null
}
+ /**
+ * Get the list of namenodes the user may access.
+ */
+ def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get("spark.yarn.access.namenodes", "")
+ .split(",")
+ .map(_.trim())
+ .filter(!_.isEmpty)
+ .map(new Path(_))
+ .toSet
+ }
+
+ def getTokenRenewer(conf: Configuration): String = {
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ logDebug("delegation token renewer is: " + delegTokenRenewer)
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
+ }
+ delegTokenRenewer
+ }
+
+ /**
+ * Obtains tokens for the namenodes passed in and adds them to the credentials.
+ */
+ def obtainTokensForNamenodes(
+ paths: Set[Path],
+ conf: Configuration,
+ creds: Credentials,
+ renewer: Option[String] = None
+ ): Unit = {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
+ paths.foreach { dst =>
+ val dstFs = dst.getFileSystem(conf)
+ logInfo("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ }
+ }
+ }
+
}
object YarnSparkHadoopUtil {
@@ -100,6 +143,14 @@ object YarnSparkHadoopUtil {
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
+ def get: YarnSparkHadoopUtil = {
+ val yarnMode = java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if (!yarnMode) {
+ throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!")
+ }
+ SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
+ }
/**
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.
@@ -212,3 +263,4 @@ object YarnSparkHadoopUtil {
classPathSeparatorField.get(null).asInstanceOf[String]
}
}
+
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index a51c2005cb..508819e242 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,57 +151,6 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
}
}
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = Client.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val caught =
- intercept[SparkException] {
- Client.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 3877da4120..d3c606e0ed 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -86,6 +86,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
tempDir = Utils.createTempDir()
logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
+ System.setProperty("SPARK_YARN_MODE", "true")
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, UTF_8)
@@ -128,6 +129,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
override def afterAll() {
yarnCluster.stop()
+ System.clearProperty("SPARK_YARN_MODE")
super.afterAll()
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 9395316b71..e10b985c3c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -27,7 +29,7 @@ import org.scalatest.{FunSuite, Matchers}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
@@ -173,4 +175,62 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
}
}
+
+ test("check access nns empty") {
+ val sparkConf = new SparkConf()
+ val util = new YarnSparkHadoopUtil
+ sparkConf.set("spark.yarn.access.namenodes", "")
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns unset") {
+ val sparkConf = new SparkConf()
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access nns space") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access two nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val util = new YarnSparkHadoopUtil
+ val renewer = util.getTokenRenewer(hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val util = new YarnSparkHadoopUtil
+ val caught =
+ intercept[SparkException] {
+ util.getTokenRenewer(hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
}