aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-05-01 15:32:09 -0500
committerThomas Graves <tgraves@apache.org>2015-05-01 15:32:09 -0500
commitb1f4ca82d170935d15f1fe6beb9af0743b4d81cd (patch)
treedb2a1fb19feaa2c28e201c4bdce1f7a2b864f794 /yarn
parent4dc8d74491b101a794cf8d386d8c5ebc6019b75f (diff)
downloadspark-b1f4ca82d170935d15f1fe6beb9af0743b4d81cd.tar.gz
spark-b1f4ca82d170935d15f1fe6beb9af0743b4d81cd.tar.bz2
spark-b1f4ca82d170935d15f1fe6beb9af0743b4d81cd.zip
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS
Take 2. Does the same thing as #4688, but fixes Hadoop-1 build. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #5823 from harishreedharan/kerberos-longrunning and squashes the following commits: 3c86bba [Hari Shreedharan] Import fixes. Import postfixOps explicitly. 4d04301 [Hari Shreedharan] Minor formatting fixes. b5e7a72 [Hari Shreedharan] Remove reflection, use a method in SparkHadoopUtil to update the token renewer. 7bff6e9 [Hari Shreedharan] Make sure all required classes are present in the jar. Fix import order. e851f70 [Hari Shreedharan] Move the ExecutorDelegationTokenRenewer to yarn module. Use reflection to use it. 36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments. 611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens. 09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml 6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to. 072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils. f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens. ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates. 7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required. e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message. 0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 7f1bc58 [Hari Shreedharan] Minor fixes, cleanup. bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup. f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files. 2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted. 61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS. 62c45ce [Hari Shreedharan] Relogin from keytab periodically. fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues. 42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master. 0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity. 9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes. f4fd711 [Hari Shreedharan] Fix SparkConf usage. 2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS. af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required. f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file. f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials. 5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues. b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS 0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire. d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens() 8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object. fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start() 41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests. bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil. f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled. 2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS. ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos 77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala205
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala123
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala106
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala69
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala51
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala62
9 files changed, 540 insertions, 105 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
new file mode 100644
index 0000000000..aaae6f9734
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.language.postfixOps
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.deploy.SparkHadoopUtil
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.ThreadUtils
+
+/*
+ * The following methods are primarily meant to make sure long-running apps like Spark
+ * Streaming apps can run without interruption while writing to secure HDFS. The
+ * scheduleLoginFromKeytab method is called on the driver when the
+ * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
+ * once 75% of the renewal interval of the original delegation tokens used for the container
+ * has elapsed. It then creates new delegation tokens and writes them to HDFS in a
+ * pre-specified location - the prefix of which is specified in the sparkConf by
+ * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
+ * to a new file, with a monotonically increasing suffix). After this, the credentials are
+ * updated once 75% of the new tokens renewal interval has elapsed.
+ *
+ * On the executor side, the updateCredentialsIfRequired method is called once 80% of the
+ * validity of the original tokens has elapsed. At that time the executor finds the
+ * credentials file with the latest timestamp and checks if it has read those credentials
+ * before (by keeping track of the suffix of the last file it read). If a new file has
+ * appeared, it will read the credentials and update the currently running UGI with it. This
+ * process happens again once 80% of the validity of this has expired.
+ */
+private[yarn] class AMDelegationTokenRenewer(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration) extends Logging {
+
+ private var lastCredentialsFileSuffix = 0
+
+ private val delegationTokenRenewer =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+
+ private val hadoopUtil = YarnSparkHadoopUtil.get
+
+ private val daysToKeepFiles = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
+ private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+
+ /**
+ * Schedule a login from the keytab and principal set using the --principal and --keytab
+ * arguments to spark-submit. This login happens only when the credentials of the current user
+ * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
+ * SparkConf to do the login. This method is a no-op in non-YARN mode.
+ *
+ */
+ private[spark] def scheduleLoginFromKeytab(): Unit = {
+ val principal = sparkConf.get("spark.yarn.principal")
+ val keytab = sparkConf.get("spark.yarn.keytab")
+
+ /**
+ * Schedule re-login and creation of new tokens. If tokens have already expired, this method
+ * will synchronously create new ones.
+ */
+ def scheduleRenewal(runnable: Runnable): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
+ // Run now!
+ if (renewalInterval <= 0) {
+ logInfo("HDFS tokens have expired, creating new tokens now.")
+ runnable.run()
+ } else {
+ logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
+ delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ // This thread periodically runs on the driver to update the delegation tokens on HDFS.
+ val driverTokenRenewerRunnable =
+ new Runnable {
+ override def run(): Unit = {
+ try {
+ writeNewTokensToHDFS(principal, keytab)
+ cleanupOldFiles()
+ } catch {
+ case e: Exception =>
+ // Log the error and try to write new tokens back in an hour
+ logWarning("Failed to write out new credentials to HDFS, will try again in an " +
+ "hour! If this happens too often tasks will fail.", e)
+ delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
+ return
+ }
+ scheduleRenewal(this)
+ }
+ }
+ // Schedule update of credentials. This handles the case of updating the tokens right now
+ // as well, since the renenwal interval will be 0, and the thread will get scheduled
+ // immediately.
+ scheduleRenewal(driverTokenRenewerRunnable)
+ }
+
+ // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
+ // least numFilesToKeep files are kept for safety
+ private def cleanupOldFiles(): Unit = {
+ import scala.concurrent.duration._
+ try {
+ val remoteFs = FileSystem.get(hadoopConf)
+ val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
+ hadoopUtil.listFilesSorted(
+ remoteFs, credentialsPath.getParent,
+ credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .dropRight(numFilesToKeep)
+ .takeWhile(_.getModificationTime < thresholdTime)
+ .foreach(x => remoteFs.delete(x.getPath, true))
+ } catch {
+ // Such errors are not fatal, so don't throw. Make sure they are logged though
+ case e: Exception =>
+ logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " +
+ "warnings there may be an issue with your HDFS cluster.", e)
+ }
+ }
+
+ private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
+ // Keytab is copied by YARN to the working directory of the AM, so full path is
+ // not needed.
+
+ // HACK:
+ // HDFS will not issue new delegation tokens, if the Credentials object
+ // passed in already has tokens for that FS even if the tokens are expired (it really only
+ // checks if there are tokens for the service, and not if they are valid). So the only real
+ // way to get new tokens is to make sure a different Credentials object is used each time to
+ // get new tokens and then the new tokens are copied over the the current user's Credentials.
+ // So:
+ // - we login as a different user and get the UGI
+ // - use that UGI to get the tokens (see doAs block below)
+ // - copy the tokens over to the current user's credentials (this will overwrite the tokens
+ // in the current user's Credentials object for this FS).
+ // The login to KDC happens each time new tokens are required, but this is rare enough to not
+ // have to worry about (like once every day or so). This makes this code clearer than having
+ // to login and then relogin every time (the HDFS API may not relogin since we don't use this
+ // UGI directly for HDFS communication.
+ logInfo(s"Attempting to login to KDC using principal: $principal")
+ val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+ logInfo("Successfully logged into KDC.")
+ val tempCreds = keytabLoggedInUGI.getCredentials
+ val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val dst = credentialsPath.getParent
+ keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
+ // Get a copy of the credentials
+ override def run(): Void = {
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
+ null
+ }
+ })
+ // Add the temp credentials back to the original ones.
+ UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+ val remoteFs = FileSystem.get(hadoopConf)
+ // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
+ // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
+ // and update the lastCredentialsFileSuffix.
+ if (lastCredentialsFileSuffix == 0) {
+ hadoopUtil.listFilesSorted(
+ remoteFs, credentialsPath.getParent,
+ credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.foreach { status =>
+ lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
+ }
+ }
+ val nextSuffix = lastCredentialsFileSuffix + 1
+ val tokenPathStr =
+ sparkConf.get("spark.yarn.credentials.file") +
+ SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
+ val tokenPath = new Path(tokenPathStr)
+ val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
+ logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
+ remoteFs.rename(tempTokenPath, tokenPath)
+ logInfo("Delegation token file rename complete.")
+ lastCredentialsFileSuffix = nextSuffix
+ }
+
+ def stop(): Unit = {
+ delegationTokenRenewer.shutdown()
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 27f804782f..e1694c1f64 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -75,6 +75,8 @@ private[spark] class ApplicationMaster(
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
+ private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+
final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
@@ -129,6 +131,15 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
+ // If the credentials file config is present, we must periodically renew tokens. So create
+ // a new AMDelegationTokenRenewer
+ if (sparkConf.contains("spark.yarn.credentials.file")) {
+ delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
+ // If a principal and keytab have been set, use that to create new credentials for executors
+ // periodically
+ delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
+ }
+
if (isClusterMode) {
runDriver(securityMgr)
} else {
@@ -193,6 +204,7 @@ private[spark] class ApplicationMaster(
logDebug("shutting down user thread")
userClassThread.interrupt()
}
+ if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
}
}
}
@@ -240,12 +252,12 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
- val driverEndpont = rpcEnv.setupEndpointRef(
+ val driverEndpoint = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
- rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, isClusterMode))
+ rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -499,6 +511,7 @@ private[spark] class ApplicationMaster(
override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
+
}
override def receive: PartialFunction[Any, Unit] = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 31ab6b491e..20ecaf092e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,9 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.{File, FileOutputStream}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
@@ -36,7 +38,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{TokenIdentifier, Token}
@@ -50,8 +51,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -69,11 +70,13 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
- private val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ private var credentials: Credentials = null
private val amMemoryOverhead = args.amMemoryOverhead // MB
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
+
+ private var loginFromKeytab = false
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
@@ -88,6 +91,8 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
+ // Setup the credentials before doing anything else, so we have don't have issues at any point.
+ setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
@@ -219,12 +224,12 @@ private[spark] class Client(
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val nns = getNameNodesToAccess(sparkConf) + dst
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
- obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
obtainTokenForHBase(hadoopConf, credentials)
@@ -243,6 +248,20 @@ private[spark] class Client(
"for alternatives.")
}
+ // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+ // HDFS, and setup the relevant environment vars, so the AM can login again.
+ if (loginFromKeytab) {
+ logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+ " via the YARN Secure Distributed Cache.")
+ val localUri = new URI(args.keytab)
+ val localPath = getQualifiedLocalPath(localUri, hadoopConf)
+ val destinationPath = copyFileToRemote(dst, localPath, replication)
+ val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(
+ destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
+ sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
+ }
+
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -388,6 +407,28 @@ private[spark] class Client(
}
/**
+ * Get the renewal interval for tokens.
+ */
+ private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
+ // We cannot use the tokens generated above since those have renewer yarn. Trying to renew
+ // those will fail with an access control issue. So create new tokens with the logged in
+ // user as renewer.
+ val creds = new Credentials()
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
+ YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
+ nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
+ val t = creds.getAllTokens
+ .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .head
+ val newExpiration = t.renew(hadoopConf)
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ val interval = newExpiration - identifier.getIssueDate
+ logInfo(s"Renewal Interval set to $interval")
+ interval
+ }
+
+ /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
@@ -398,7 +439,16 @@ private[spark] class Client(
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
-
+ if (loginFromKeytab) {
+ val remoteFs = FileSystem.get(hadoopConf)
+ val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
+ val credentialsFile = "credentials-" + UUID.randomUUID().toString
+ sparkConf.set(
+ "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
+ logInfo(s"Credentials file set to: $credentialsFile")
+ val renewalInterval = getTokenRenewalInterval(stagingDirPath)
+ sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
+ }
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -463,7 +513,6 @@ private[spark] class Client(
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
-
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
@@ -638,6 +687,24 @@ private[spark] class Client(
amContainer
}
+ def setupCredentials(): Unit = {
+ if (args.principal != null) {
+ require(args.keytab != null, "Keytab must be specified when principal is specified.")
+ logInfo("Attempting to login to the Kerberos" +
+ s" using principal: ${args.principal} and keytab: ${args.keytab}")
+ val f = new File(args.keytab)
+ // Generate a file name that can be used for the keytab file, that does not conflict
+ // with any user file.
+ val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ loginFromKeytab = true
+ sparkConf.set("spark.yarn.keytab", keytabFileName)
+ sparkConf.set("spark.yarn.principal", args.principal)
+ logInfo("Successfully logged into the KDC.")
+ }
+ credentials = UserGroupInformation.getCurrentUser.getCredentials
+ }
+
/**
* Report the state of an application until it has exited, either successfully or
* due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
@@ -994,46 +1061,6 @@ object Client extends Logging {
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
- * Get the list of namenodes the user may access.
- */
- private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get("spark.yarn.access.namenodes", "")
- .split(",")
- .map(_.trim())
- .filter(!_.isEmpty)
- .map(new Path(_))
- .toSet
- }
-
- private[yarn] def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
- delegTokenRenewer
- }
-
- /**
- * Obtains tokens for the namenodes passed in and adds them to the credentials.
- */
- private def obtainTokensForNamenodes(
- paths: Set[Path],
- conf: Configuration,
- creds: Credentials): Unit = {
- if (UserGroupInformation.isSecurityEnabled()) {
- val delegTokenRenewer = getTokenRenewer(conf)
- paths.foreach { dst =>
- val dstFs = dst.getFileSystem(conf)
- logDebug("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(delegTokenRenewer, creds)
- }
- }
- }
-
- /**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1423533470..5653c9f14d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -42,6 +42,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
+ var principal: String = null
+ var keytab: String = null
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
@@ -231,6 +233,14 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
archives = value
args = tail
+ case ("--principal") :: value :: tail =>
+ principal = value
+ args = tail
+
+ case ("--keytab") :: value :: tail =>
+ keytab = value
+ args = tail
+
case Nil =>
case _ =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
new file mode 100644
index 0000000000..229c2c4d5e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+import scala.util.control.NonFatal
+
+private[spark] class ExecutorDelegationTokenUpdater(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration) extends Logging {
+
+ @volatile private var lastCredentialsFileSuffix = 0
+
+ private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+
+ private val delegationTokenRenewer =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+
+ // On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
+ private val executorUpdaterRunnable =
+ new Runnable {
+ override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
+ }
+
+ def updateCredentialsIfRequired(): Unit = {
+ try {
+ val credentialsFilePath = new Path(credentialsFile)
+ val remoteFs = FileSystem.get(hadoopConf)
+ SparkHadoopUtil.get.listFilesSorted(
+ remoteFs, credentialsFilePath.getParent,
+ credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.foreach { credentialsStatus =>
+ val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
+ if (suffix > lastCredentialsFileSuffix) {
+ logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
+ val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
+ lastCredentialsFileSuffix = suffix
+ UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+ logInfo("Tokens updated from credentials file.")
+ } else {
+ // Check every hour to see if new credentials arrived.
+ logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
+ "tokens yet, will check again in an hour.")
+ delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
+ return
+ }
+ }
+ val timeFromNowToRenewal =
+ SparkHadoopUtil.get.getTimeFromNowToRenewal(
+ sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
+ if (timeFromNowToRenewal <= 0) {
+ executorUpdaterRunnable.run()
+ } else {
+ logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
+ delegationTokenRenewer.schedule(
+ executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
+ }
+ } catch {
+ // Since the file may get deleted while we are reading it, catch the Exception and come
+ // back in an hour to try again
+ case NonFatal(e) =>
+ logWarning("Error while trying to update credentials, will try again in 1 hour", e)
+ delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
+ }
+ }
+
+ private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
+ val stream = remoteFs.open(tokenPath)
+ try {
+ val newCredentials = new Credentials()
+ newCredentials.readTokenStorageStream(stream)
+ newCredentials
+ } finally {
+ stream.close()
+ }
+ }
+
+ def stop(): Unit = {
+ delegationTokenRenewer.shutdown()
+ }
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5881dc5ffa..ba91872107 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -24,18 +24,19 @@ import java.util.regex.Pattern
import scala.collection.mutable.HashMap
import scala.util.Try
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
/**
@@ -43,6 +44,8 @@ import org.apache.spark.util.Utils
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
+ private var tokenRenewer: Option[ExecutorDelegationTokenUpdater] = None
+
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
dest.addCredentials(source.getCredentials())
}
@@ -82,6 +85,57 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
if (credentials != null) credentials.getSecretKey(new Text(key)) else null
}
+ /**
+ * Get the list of namenodes the user may access.
+ */
+ def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get("spark.yarn.access.namenodes", "")
+ .split(",")
+ .map(_.trim())
+ .filter(!_.isEmpty)
+ .map(new Path(_))
+ .toSet
+ }
+
+ def getTokenRenewer(conf: Configuration): String = {
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ logDebug("delegation token renewer is: " + delegTokenRenewer)
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
+ }
+ delegTokenRenewer
+ }
+
+ /**
+ * Obtains tokens for the namenodes passed in and adds them to the credentials.
+ */
+ def obtainTokensForNamenodes(
+ paths: Set[Path],
+ conf: Configuration,
+ creds: Credentials,
+ renewer: Option[String] = None
+ ): Unit = {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
+ paths.foreach { dst =>
+ val dstFs = dst.getFileSystem(conf)
+ logInfo("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ }
+ }
+ }
+
+ private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = {
+ tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
+ tokenRenewer.get.updateCredentialsIfRequired()
+ }
+
+ private[spark] override def stopExecutorDelegationTokenRenewer(): Unit = {
+ tokenRenewer.foreach(_.stop())
+ }
+
}
object YarnSparkHadoopUtil {
@@ -100,6 +154,14 @@ object YarnSparkHadoopUtil {
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
+ def get: YarnSparkHadoopUtil = {
+ val yarnMode = java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if (!yarnMode) {
+ throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!")
+ }
+ SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
+ }
/**
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.
@@ -212,3 +274,4 @@ object YarnSparkHadoopUtil {
classPathSeparatorField.get(null).asInstanceOf[String]
}
}
+
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index a51c2005cb..508819e242 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,57 +151,6 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
}
}
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val nns = Client.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = Client.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val caught =
- intercept[SparkException] {
- Client.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 3877da4120..d3c606e0ed 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -86,6 +86,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
tempDir = Utils.createTempDir()
logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
+ System.setProperty("SPARK_YARN_MODE", "true")
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, UTF_8)
@@ -128,6 +129,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
override def afterAll() {
yarnCluster.stop()
+ System.clearProperty("SPARK_YARN_MODE")
super.afterAll()
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 9395316b71..e10b985c3c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -27,7 +29,7 @@ import org.scalatest.{FunSuite, Matchers}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
@@ -173,4 +175,62 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
}
}
+
+ test("check access nns empty") {
+ val sparkConf = new SparkConf()
+ val util = new YarnSparkHadoopUtil
+ sparkConf.set("spark.yarn.access.namenodes", "")
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns unset") {
+ val sparkConf = new SparkConf()
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access nns space") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access two nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+ val util = new YarnSparkHadoopUtil
+ val nns = util.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val util = new YarnSparkHadoopUtil
+ val renewer = util.getTokenRenewer(hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val util = new YarnSparkHadoopUtil
+ val caught =
+ intercept[SparkException] {
+ util.getTokenRenewer(hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
}