aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala63
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala114
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala243
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala (renamed from yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala)107
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala105
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala130
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala74
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala110
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala129
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala57
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider1
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala97
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala150
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala71
18 files changed, 958 insertions, 523 deletions
diff --git a/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000000..22ead56d23
--- /dev/null
+++ b/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1,3 @@
+org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
+org.apache.spark.deploy.yarn.security.HiveCredentialProvider
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c371ad616a..614278c8b2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -35,6 +35,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc._
@@ -112,7 +113,7 @@ private[spark] class ApplicationMaster(
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
- private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+ private var credentialRenewer: AMCredentialRenewer = _
// Load the list of localized files set by the client. This is used when launching executors,
// and is loaded here so that these configs don't pollute the Web UI's environment page in
@@ -235,10 +236,11 @@ private[spark] class ApplicationMaster(
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
- delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
- delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
+ credentialRenewer =
+ new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
+ credentialRenewer.scheduleLoginFromKeytab()
}
if (isClusterMode) {
@@ -305,7 +307,10 @@ private[spark] class ApplicationMaster(
logDebug("shutting down user thread")
userClassThread.interrupt()
}
- if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
+ if (!inShutdown && credentialRenewer != null) {
+ credentialRenewer.stop()
+ credentialRenewer = null
+ }
}
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 348f9bf94a..e3572d781b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
- OutputStreamWriter}
+import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
@@ -35,7 +34,6 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -52,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
@@ -122,6 +121,8 @@ private[spark] class Client(
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+ private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
def reportLauncherState(state: SparkAppHandle.State): Unit = {
launcherBackend.setState(state)
}
@@ -390,8 +391,31 @@ private[spark] class Client(
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = destDir.getFileSystem(hadoopConf)
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir
- YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
+
+ // Merge credentials obtained from registered providers
+ val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
+
+ if (credentials != null) {
+ logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
+ }
+
+ // If we use principal and keytab to login, also credentials can be renewed some time
+ // after current time, we should pass the next renewal and updating time to credential
+ // renewer and updater.
+ if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
+ nearestTimeOfNextRenewal != Long.MaxValue) {
+
+ // Valid renewal time is 75% of next renewal time, and the valid update time will be
+ // slightly later then renewal time (80% of next renewal time). This is to make sure
+ // credentials are renewed and updated before expired.
+ val currTime = System.currentTimeMillis()
+ val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
+ val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
+
+ sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
+ sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
+ }
+
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
@@ -400,11 +424,6 @@ private[spark] class Client(
// same name but different path files are added multiple time, YARN will fail to launch
// containers for the app with an internal error.
val distributedNames = new HashSet[String]
- YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
- YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
- if (credentials != null) {
- logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
- }
val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
.getOrElse(fs.getDefaultReplication(destDir))
@@ -717,28 +736,6 @@ private[spark] class Client(
}
/**
- * Get the renewal interval for tokens.
- */
- private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
- // We cannot use the tokens generated above since those have renewer yarn. Trying to renew
- // those will fail with an access control issue. So create new tokens with the logged in
- // user as renewer.
- val creds = new Credentials()
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
- YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
- nns, hadoopConf, creds, sparkConf.get(PRINCIPAL))
- val t = creds.getAllTokens.asScala
- .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- .head
- val newExpiration = t.renew(hadoopConf)
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal Interval set to $interval")
- interval
- }
-
- /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(
@@ -754,8 +751,6 @@ private[spark] class Client(
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
- val renewalInterval = getTokenRenewalInterval(stagingDirPath)
- sparkConf.set(TOKEN_RENEWAL_INTERVAL, renewalInterval)
}
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
deleted file mode 100644
index 3aa64071d4..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-private[spark] class ExecutorDelegationTokenUpdater(
- sparkConf: SparkConf,
- hadoopConf: Configuration) extends Logging {
-
- @volatile private var lastCredentialsFileSuffix = 0
-
- private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
- private val freshHadoopConf =
- SparkHadoopUtil.get.getConfBypassingFSCache(
- hadoopConf, new Path(credentialsFile).toUri.getScheme)
-
- private val delegationTokenRenewer =
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
- // On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
- private val executorUpdaterRunnable =
- new Runnable {
- override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
- }
-
- def updateCredentialsIfRequired(): Unit = {
- try {
- val credentialsFilePath = new Path(credentialsFile)
- val remoteFs = FileSystem.get(freshHadoopConf)
- SparkHadoopUtil.get.listFilesSorted(
- remoteFs, credentialsFilePath.getParent,
- credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .lastOption.foreach { credentialsStatus =>
- val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
- if (suffix > lastCredentialsFileSuffix) {
- logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
- val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
- lastCredentialsFileSuffix = suffix
- UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
- logInfo("Tokens updated from credentials file.")
- } else {
- // Check every hour to see if new credentials arrived.
- logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
- "tokens yet, will check again in an hour.")
- delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
- return
- }
- }
- val timeFromNowToRenewal =
- SparkHadoopUtil.get.getTimeFromNowToRenewal(
- sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
- if (timeFromNowToRenewal <= 0) {
- // We just checked for new credentials but none were there, wait a minute and retry.
- // This handles the shutdown case where the staging directory may have been removed(see
- // SPARK-12316 for more details).
- delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.MINUTES)
- } else {
- logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
- delegationTokenRenewer.schedule(
- executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
- }
- } catch {
- // Since the file may get deleted while we are reading it, catch the Exception and come
- // back in an hour to try again
- case NonFatal(e) =>
- logWarning("Error while trying to update credentials, will try again in 1 hour", e)
- delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
- }
- }
-
- private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
- val stream = remoteFs.open(tokenPath)
- try {
- val newCredentials = new Credentials()
- newCredentials.readTokenStorageStream(stream)
- newCredentials
- } finally {
- stream.close()
- }
- }
-
- def stop(): Unit = {
- delegationTokenRenewer.shutdown()
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 156a7a30ea..cc53b1b06e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,25 +18,18 @@
package org.apache.spark.deploy.yarn
import java.io.File
-import java.lang.reflect.UndeclaredThrowableException
import java.nio.charset.StandardCharsets.UTF_8
-import java.security.PrivilegedExceptionAction
import java.util.regex.Matcher
import java.util.regex.Pattern
-import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.reflect.runtime._
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.{JobConf, Master}
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
@@ -45,7 +38,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
@@ -55,7 +48,7 @@ import org.apache.spark.util.Utils
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
- private var tokenRenewer: Option[ExecutorDelegationTokenUpdater] = None
+ private var credentialUpdater: CredentialUpdater = _
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
dest.addCredentials(source.getCredentials())
@@ -96,237 +89,23 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
if (credentials != null) credentials.getSecretKey(new Text(key)) else null
}
- /**
- * Get the list of namenodes the user may access.
- */
- def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get(NAMENODES_TO_ACCESS)
- .map(new Path(_))
- .toSet
- }
-
- def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
- delegTokenRenewer
- }
-
- /**
- * Obtains tokens for the namenodes passed in and adds them to the credentials.
- */
- def obtainTokensForNamenodes(
- paths: Set[Path],
- conf: Configuration,
- creds: Credentials,
- renewer: Option[String] = None
- ): Unit = {
- if (UserGroupInformation.isSecurityEnabled()) {
- val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
- paths.foreach { dst =>
- val dstFs = dst.getFileSystem(conf)
- logInfo("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(delegTokenRenewer, creds)
- }
- }
- }
-
- /**
- * Obtains token for the Hive metastore and adds them to the credentials.
- */
- def obtainTokenForHiveMetastore(
- sparkConf: SparkConf,
- conf: Configuration,
- credentials: Credentials) {
- if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
- YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
- credentials.addToken(new Text("hive.server2.delegation.token"), _)
- }
- }
+ private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = {
+ credentialUpdater =
+ new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater()
+ credentialUpdater.start()
}
- /**
- * Obtain a security token for HBase.
- */
- def obtainTokenForHBase(
- sparkConf: SparkConf,
- conf: Configuration,
- credentials: Credentials): Unit = {
- if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
- YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
- credentials.addToken(token.getService, token)
- logInfo("Added HBase security token to credentials.")
- }
+ private[spark] override def stopCredentialUpdater(): Unit = {
+ if (credentialUpdater != null) {
+ credentialUpdater.stop()
+ credentialUpdater = null
}
}
- /**
- * Return whether delegation tokens should be retrieved for the given service when security is
- * enabled. By default, tokens are retrieved, but that behavior can be changed by setting
- * a service-specific configuration.
- */
- private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
- conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
- }
-
- private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = {
- tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
- tokenRenewer.get.updateCredentialsIfRequired()
- }
-
- private[spark] override def stopExecutorDelegationTokenRenewer(): Unit = {
- tokenRenewer.foreach(_.stop())
- }
-
private[spark] def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}
-
- /**
- * Obtains token for the Hive metastore, using the current user as the principal.
- * Some exceptions are caught and downgraded to a log message.
- * @param conf hadoop configuration; the Hive configuration will be based on this
- * @return a token, or `None` if there's no need for a token (no metastore URI or principal
- * in the config), or if a binding exception was caught and downgraded.
- */
- def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
- try {
- obtainTokenForHiveMetastoreInner(conf)
- } catch {
- case e: ClassNotFoundException =>
- logInfo(s"Hive class not found $e")
- logDebug("Hive class not found", e)
- None
- }
- }
-
- /**
- * Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
- * @param conf hadoop configuration; the Hive configuration will be based on this.
- * @param username the username of the principal requesting the delegating token.
- * @return a delegation token
- */
- private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
- Option[Token[DelegationTokenIdentifier]] = {
- val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-
- // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
- // to a Configuration and used without reflection
- val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
- // using the (Configuration, Class) constructor allows the current configuration to be included
- // in the hive config.
- val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
- classOf[Object].getClass)
- val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
- val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
-
- // Check for local metastore
- if (metastoreUri.nonEmpty) {
- val principalKey = "hive.metastore.kerberos.principal"
- val principal = hiveConf.getTrimmed(principalKey, "")
- require(principal.nonEmpty, "Hive principal $principalKey undefined")
- val currentUser = UserGroupInformation.getCurrentUser()
- logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
- s"$principal at $metastoreUri")
- val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
- val closeCurrent = hiveClass.getMethod("closeCurrent")
- try {
- // get all the instance methods before invoking any
- val getDelegationToken = hiveClass.getMethod("getDelegationToken",
- classOf[String], classOf[String])
- val getHive = hiveClass.getMethod("get", hiveConfClass)
-
- doAsRealUser {
- val hive = getHive.invoke(null, hiveConf)
- val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
- .asInstanceOf[String]
- val hive2Token = new Token[DelegationTokenIdentifier]()
- hive2Token.decodeFromUrlString(tokenStr)
- Some(hive2Token)
- }
- } finally {
- Utils.tryLogNonFatalError {
- closeCurrent.invoke(null)
- }
- }
- } else {
- logDebug("HiveMetaStore configured in localmode")
- None
- }
- }
-
- /**
- * Obtain a security token for HBase.
- *
- * Requirements
- *
- * 1. `"hbase.security.authentication" == "kerberos"`
- * 2. The HBase classes `HBaseConfiguration` and `TokenUtil` could be loaded
- * and invoked.
- *
- * @param conf Hadoop configuration; an HBase configuration is created
- * from this.
- * @return a token if the requirements were met, `None` if not.
- */
- def obtainTokenForHBase(conf: Configuration): Option[Token[TokenIdentifier]] = {
- try {
- obtainTokenForHBaseInner(conf)
- } catch {
- case e: ClassNotFoundException =>
- logInfo(s"HBase class not found $e")
- logDebug("HBase class not found", e)
- None
- }
- }
-
- /**
- * Obtain a security token for HBase if `"hbase.security.authentication" == "kerberos"`
- *
- * @param conf Hadoop configuration; an HBase configuration is created
- * from this.
- * @return a token if one was needed
- */
- def obtainTokenForHBaseInner(conf: Configuration): Option[Token[TokenIdentifier]] = {
- val mirror = universe.runtimeMirror(getClass.getClassLoader)
- val confCreate = mirror.classLoader.
- loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
- getMethod("create", classOf[Configuration])
- val obtainToken = mirror.classLoader.
- loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
- getMethod("obtainToken", classOf[Configuration])
- val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
- if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
- logDebug("Attempting to fetch HBase security token.")
- Some(obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]])
- } else {
- None
- }
- }
-
- /**
- * Run some code as the real logged in user (which may differ from the current user, for
- * example, when using proxying).
- */
- private def doAsRealUser[T](fn: => T): T = {
- val currentUser = UserGroupInformation.getCurrentUser()
- val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
-
- // For some reason the Scala-generated anonymous class ends up causing an
- // UndeclaredThrowableException, even if you annotate the method with @throws.
- try {
- realUser.doAs(new PrivilegedExceptionAction[T]() {
- override def run(): T = fn
- })
- } catch {
- case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
- }
- }
-
}
object YarnSparkHadoopUtil {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 49c0177ab2..ca8c89043a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -319,6 +319,16 @@ package object config {
.stringConf
.createOptional
+ private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime")
+ .internal()
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(Long.MaxValue)
+
+ private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime")
+ .internal()
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(Long.MaxValue)
+
// The list of cache-related config entries. This is used by Client and the AM to clean
// up the environment so that these settings do not appear on the web UI.
private[yarn] val CACHE_CONFIGS = Seq(
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 310a7a6b05..7e76f402db 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.deploy.yarn
+package org.apache.spark.deploy.yarn.security
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{Executors, TimeUnit}
@@ -25,39 +25,42 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.ThreadUtils
-/*
+/**
* The following methods are primarily meant to make sure long-running apps like Spark
- * Streaming apps can run without interruption while writing to secure HDFS. The
- * scheduleLoginFromKeytab method is called on the driver when the
- * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original delegation tokens used for the container
- * has elapsed. It then creates new delegation tokens and writes them to HDFS in a
+ * Streaming apps can run without interruption while accessing secured services. The
+ * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
+ * This method wakes up a thread that logs into the KDC
+ * once 75% of the renewal interval of the original credentials used for the container
+ * has elapsed. It then obtains new credentials and writes them to HDFS in a
* pre-specified location - the prefix of which is specified in the sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
- * to a new file, with a monotonically increasing suffix). After this, the credentials are
- * updated once 75% of the new tokens renewal interval has elapsed.
+ * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
+ * - each update goes to a new file, with a monotonically increasing suffix), also the
+ * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
+ * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
*
- * On the executor side, the updateCredentialsIfRequired method is called once 80% of the
- * validity of the original tokens has elapsed. At that time the executor finds the
- * credentials file with the latest timestamp and checks if it has read those credentials
- * before (by keeping track of the suffix of the last file it read). If a new file has
+ * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
+ * called once 80% of the validity of the original credentials has elapsed. At that time the
+ * executor finds the credentials file with the latest timestamp and checks if it has read those
+ * credentials before (by keeping track of the suffix of the last file it read). If a new file has
* appeared, it will read the credentials and update the currently running UGI with it. This
* process happens again once 80% of the validity of this has expired.
*/
-private[yarn] class AMDelegationTokenRenewer(
+private[yarn] class AMCredentialRenewer(
sparkConf: SparkConf,
- hadoopConf: Configuration) extends Logging {
+ hadoopConf: Configuration,
+ credentialManager: ConfigurableCredentialManager) extends Logging {
private var lastCredentialsFileSuffix = 0
- private val delegationTokenRenewer =
+ private val credentialRenewer =
Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+ ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
private val hadoopUtil = YarnSparkHadoopUtil.get
@@ -67,6 +70,8 @@ private[yarn] class AMDelegationTokenRenewer(
private val freshHadoopConf =
hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
+ @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+
/**
* Schedule a login from the keytab and principal set using the --principal and --keytab
* arguments to spark-submit. This login happens only when the credentials of the current user
@@ -79,44 +84,43 @@ private[yarn] class AMDelegationTokenRenewer(
val keytab = sparkConf.get(KEYTAB).get
/**
- * Schedule re-login and creation of new tokens. If tokens have already expired, this method
- * will synchronously create new ones.
+ * Schedule re-login and creation of new credentials. If credentials have already expired, this
+ * method will synchronously create new ones.
*/
def scheduleRenewal(runnable: Runnable): Unit = {
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
// Run now!
- if (renewalInterval <= 0) {
- logInfo("HDFS tokens have expired, creating new tokens now.")
+ val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+ if (remainingTime <= 0) {
+ logInfo("Credentials have expired, creating new ones now.")
runnable.run()
} else {
- logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
- delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
+ logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+ credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
}
}
- // This thread periodically runs on the driver to update the delegation tokens on HDFS.
- val driverTokenRenewerRunnable =
+ // This thread periodically runs on the AM to update the credentials on HDFS.
+ val credentialRenewerRunnable =
new Runnable {
override def run(): Unit = {
try {
- writeNewTokensToHDFS(principal, keytab)
+ writeNewCredentialsToHDFS(principal, keytab)
cleanupOldFiles()
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
logWarning("Failed to write out new credentials to HDFS, will try again in an " +
"hour! If this happens too often tasks will fail.", e)
- delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
+ credentialRenewer.schedule(this, 1, TimeUnit.HOURS)
return
}
scheduleRenewal(this)
}
}
- // Schedule update of credentials. This handles the case of updating the tokens right now
+ // Schedule update of credentials. This handles the case of updating the credentials right now
// as well, since the renewal interval will be 0, and the thread will get scheduled
// immediately.
- scheduleRenewal(driverTokenRenewerRunnable)
+ scheduleRenewal(credentialRenewerRunnable)
}
// Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
@@ -136,12 +140,12 @@ private[yarn] class AMDelegationTokenRenewer(
} catch {
// Such errors are not fatal, so don't throw. Make sure they are logged though
case e: Exception =>
- logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " +
- "warnings there may be an issue with your HDFS cluster.", e)
+ logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
+ "such warnings there may be an issue with your HDFS cluster.", e)
}
}
- private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
+ private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = {
// Keytab is copied by YARN to the working directory of the AM, so full path is
// not needed.
@@ -166,16 +170,33 @@ private[yarn] class AMDelegationTokenRenewer(
val tempCreds = keytabLoggedInUGI.getCredentials
val credentialsPath = new Path(credentialsFile)
val dst = credentialsPath.getParent
+ var nearestNextRenewalTime = Long.MaxValue
keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
// Get a copy of the credentials
override def run(): Void = {
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
- hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
- hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, tempCreds)
- hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds)
+ nearestNextRenewalTime = credentialManager.obtainCredentials(freshHadoopConf, tempCreds)
null
}
})
+
+ val currTime = System.currentTimeMillis()
+ val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) {
+ // If next renewal time is earlier than current time, we set next renewal time to current
+ // time, this will trigger next renewal immediately. Also set next update time to current
+ // time. There still has a gap between token renewal and update will potentially introduce
+ // issue.
+ logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is earlier than " +
+ s"current time ($currTime), which is unexpected, please check your credential renewal " +
+ "related configurations in the target services.")
+ timeOfNextRenewal = currTime
+ currTime
+ } else {
+ // Next valid renewal time is about 75% of credential renewal time, and update time is
+ // slightly later than valid renewal time (80% of renewal time).
+ timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong
+ ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
+ }
+
// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
val remoteFs = FileSystem.get(freshHadoopConf)
@@ -191,10 +212,14 @@ private[yarn] class AMDelegationTokenRenewer(
}
}
val nextSuffix = lastCredentialsFileSuffix + 1
+
val tokenPathStr =
- credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
+ credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
+ timeOfNextUpdate.toLong.toString + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
+ nextSuffix
val tokenPath = new Path(tokenPathStr)
val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+
logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
val credentials = UserGroupInformation.getCurrentUser.getCredentials
credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
@@ -205,6 +230,6 @@ private[yarn] class AMDelegationTokenRenewer(
}
def stop(): Unit = {
- delegationTokenRenewer.shutdown()
+ credentialRenewer.shutdown()
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
new file mode 100644
index 0000000000..c4c07b4930
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * A ConfigurableCredentialManager to manage all the registered credential providers and offer
+ * APIs for other modules to obtain credentials as well as renewal time. By default
+ * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
+ * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
+ * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
+ * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
+ *
+ * Also each credential provider is controlled by
+ * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false.
+ */
+private[yarn] final class ConfigurableCredentialManager(
+ sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
+ private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
+ private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
+
+ // Maintain all the registered credential providers
+ private val credentialProviders = {
+ val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
+ Utils.getContextOrSparkClassLoader).asScala
+
+ // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
+ providers.filter { p =>
+ sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
+ .orElse {
+ sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c =>
+ logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " +
+ s"using ${providerEnabledConfig.format(p.serviceName)} instead")
+ c
+ }
+ }.map(_.toBoolean).getOrElse(true)
+ }.map { p => (p.serviceName, p) }.toMap
+ }
+
+ /**
+ * Get credential provider for the specified service.
+ */
+ def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
+ credentialProviders.get(service)
+ }
+
+ /**
+ * Obtain credentials from all the registered providers.
+ * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
+ * otherwise the nearest renewal time of any credentials will be returned.
+ */
+ def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = {
+ credentialProviders.values.flatMap { provider =>
+ if (provider.credentialsRequired(hadoopConf)) {
+ provider.obtainCredentials(hadoopConf, sparkConf, creds)
+ } else {
+ logDebug(s"Service ${provider.serviceName} does not require a token." +
+ s" Check your configuration to see if security is disabled or not.")
+ None
+ }
+ }.foldLeft(Long.MaxValue)(math.min)
+ }
+
+ /**
+ * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
+ * instance when it is not used. AM will use it to renew credentials periodically.
+ */
+ def credentialRenewer(): AMCredentialRenewer = {
+ new AMCredentialRenewer(sparkConf, hadoopConf, this)
+ }
+
+ /**
+ * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
+ * when it is not used. Executors and driver (client mode) will use it to update credentials.
+ * periodically.
+ */
+ def credentialUpdater(): CredentialUpdater = {
+ new CredentialUpdater(sparkConf, hadoopConf, this)
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
new file mode 100644
index 0000000000..5df4fbd9c1
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class CredentialUpdater(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration,
+ credentialManager: ConfigurableCredentialManager) extends Logging {
+
+ @volatile private var lastCredentialsFileSuffix = 0
+
+ private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+ private val freshHadoopConf =
+ SparkHadoopUtil.get.getConfBypassingFSCache(
+ hadoopConf, new Path(credentialsFile).toUri.getScheme)
+
+ private val credentialUpdater =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+ // This thread wakes up and picks up new credentials from HDFS, if any.
+ private val credentialUpdaterRunnable =
+ new Runnable {
+ override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
+ }
+
+ /** Start the credential updater task */
+ def start(): Unit = {
+ val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+ val remainingTime = startTime - System.currentTimeMillis()
+ if (remainingTime <= 0) {
+ credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES)
+ } else {
+ logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.")
+ credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ private def updateCredentialsIfRequired(): Unit = {
+ val timeToNextUpdate = try {
+ val credentialsFilePath = new Path(credentialsFile)
+ val remoteFs = FileSystem.get(freshHadoopConf)
+ SparkHadoopUtil.get.listFilesSorted(
+ remoteFs, credentialsFilePath.getParent,
+ credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.map { credentialsStatus =>
+ val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
+ if (suffix > lastCredentialsFileSuffix) {
+ logInfo("Reading new credentials from " + credentialsStatus.getPath)
+ val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
+ lastCredentialsFileSuffix = suffix
+ UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+ logInfo("Credentials updated from credentials file.")
+
+ val remainingTime = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
+ - System.currentTimeMillis()
+ if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else remainingTime
+ } else {
+ // If current credential file is older than expected, sleep 1 hour and check again.
+ TimeUnit.HOURS.toMillis(1)
+ }
+ }.getOrElse {
+ // Wait for 1 minute to check again if there's no credential file currently
+ TimeUnit.MINUTES.toMillis(1)
+ }
+ } catch {
+ // Since the file may get deleted while we are reading it, catch the Exception and come
+ // back in an hour to try again
+ case NonFatal(e) =>
+ logWarning("Error while trying to update credentials, will try again in 1 hour", e)
+ TimeUnit.HOURS.toMillis(1)
+ }
+
+ credentialUpdater.schedule(
+ credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS)
+ }
+
+ private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
+ val stream = remoteFs.open(tokenPath)
+ try {
+ val newCredentials = new Credentials()
+ newCredentials.readTokenStorageStream(stream)
+ newCredentials
+ } finally {
+ stream.close()
+ }
+ }
+
+ private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = {
+ val name = credentialsPath.getName
+ val index = name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
+ val slice = name.substring(0, index)
+ val last2index = slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
+ name.substring(last2index + 1, index).toLong
+ }
+
+ def stop(): Unit = {
+ credentialUpdater.shutdown()
+ }
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
new file mode 100644
index 0000000000..5571df09a2
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {
+
+ override def serviceName: String = "hbase"
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ try {
+ val mirror = universe.runtimeMirror(getClass.getClassLoader)
+ val obtainToken = mirror.classLoader.
+ loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
+ getMethod("obtainToken", classOf[Configuration])
+
+ logDebug("Attempting to fetch HBase security token.")
+ val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
+ .asInstanceOf[Token[_ <: TokenIdentifier]]
+ logInfo(s"Get token from HBase: ${token.toString}")
+ creds.addToken(token.getService, token)
+ } catch {
+ case NonFatal(e) =>
+ logDebug(s"Failed to get token from service $serviceName", e)
+ }
+
+ None
+ }
+
+ override def credentialsRequired(hadoopConf: Configuration): Boolean = {
+ hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
+ }
+
+ private def hbaseConf(conf: Configuration): Configuration = {
+ try {
+ val mirror = universe.runtimeMirror(getClass.getClassLoader)
+ val confCreate = mirror.classLoader.
+ loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
+ getMethod("create", classOf[Configuration])
+ confCreate.invoke(null, conf).asInstanceOf[Configuration]
+ } catch {
+ case NonFatal(e) =>
+ logDebug("Fail to invoke HBaseConfiguration", e)
+ conf
+ }
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
new file mode 100644
index 0000000000..8d06d735ba
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
+ // Token renewal interval, this value will be set in the first call,
+ // if None means no token renewer specified, so cannot get token renewal interval.
+ private var tokenRenewalInterval: Option[Long] = null
+
+ override val serviceName: String = "hdfs"
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ // NameNode to access, used to get tokens from different FileSystems
+ nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+ val dstFs = dst.getFileSystem(hadoopConf)
+ logInfo("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
+ }
+
+ // Get the token renewal interval if it is not set. It will only be called once.
+ if (tokenRenewalInterval == null) {
+ tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
+ }
+
+ // Get the time of next renewal.
+ tokenRenewalInterval.map { interval =>
+ creds.getAllTokens.asScala
+ .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .map { t =>
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ identifier.getIssueDate + interval
+ }.foldLeft(0L)(math.max)
+ }
+ }
+
+ private def getTokenRenewalInterval(
+ hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
+ // We cannot use the tokens generated with renewer yarn. Trying to renew
+ // those will fail with an access control issue. So create new tokens with the logged in
+ // user as renewer.
+ sparkConf.get(PRINCIPAL).map { renewer =>
+ val creds = new Credentials()
+ nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+ val dstFs = dst.getFileSystem(hadoopConf)
+ dstFs.addDelegationTokens(renewer, creds)
+ }
+ val t = creds.getAllTokens.asScala
+ .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .head
+ val newExpiration = t.renew(hadoopConf)
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ val interval = newExpiration - identifier.getIssueDate
+ logInfo(s"Renewal Interval is $interval")
+ interval
+ }
+ }
+
+ private def getTokenRenewer(conf: Configuration): String = {
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ logDebug("delegation token renewer is: " + delegTokenRenewer)
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
+ }
+
+ delegTokenRenewer
+ }
+
+ private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
+ sparkConf.get(STAGING_DIR).map(new Path(_))
+ .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
new file mode 100644
index 0000000000..16d8fc32bb
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.security.PrivilegedExceptionAction
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging {
+
+ override def serviceName: String = "hive"
+
+ private def hiveConf(hadoopConf: Configuration): Configuration = {
+ try {
+ val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+ // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
+ // to a Configuration and used without reflection
+ val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+ // using the (Configuration, Class) constructor allows the current configuration to be
+ // included in the hive config.
+ val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
+ classOf[Object].getClass)
+ ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration]
+ } catch {
+ case NonFatal(e) =>
+ logDebug("Fail to create Hive Configuration", e)
+ hadoopConf
+ }
+ }
+
+ override def credentialsRequired(hadoopConf: Configuration): Boolean = {
+ UserGroupInformation.isSecurityEnabled &&
+ hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
+ }
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ val conf = hiveConf(hadoopConf)
+
+ val principalKey = "hive.metastore.kerberos.principal"
+ val principal = conf.getTrimmed(principalKey, "")
+ require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+ val metastoreUri = conf.getTrimmed("hive.metastore.uris", "")
+ require(metastoreUri.nonEmpty, "Hive metastore uri undefined")
+
+ val currentUser = UserGroupInformation.getCurrentUser()
+ logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
+ s"$principal at $metastoreUri")
+
+ val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+ val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+ val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+ val closeCurrent = hiveClass.getMethod("closeCurrent")
+
+ try {
+ // get all the instance methods before invoking any
+ val getDelegationToken = hiveClass.getMethod("getDelegationToken",
+ classOf[String], classOf[String])
+ val getHive = hiveClass.getMethod("get", hiveConfClass)
+
+ doAsRealUser {
+ val hive = getHive.invoke(null, conf)
+ val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
+ .asInstanceOf[String]
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
+ creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
+ }
+ } catch {
+ case NonFatal(e) =>
+ logDebug(s"Fail to get token from service $serviceName", e)
+ } finally {
+ Utils.tryLogNonFatalError {
+ closeCurrent.invoke(null)
+ }
+ }
+
+ None
+ }
+
+ /**
+ * Run some code as the real logged in user (which may differ from the current user, for
+ * example, when using proxying).
+ */
+ private def doAsRealUser[T](fn: => T): T = {
+ val currentUser = UserGroupInformation.getCurrentUser()
+ val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
+
+ // For some reason the Scala-generated anonymous class ends up causing an
+ // UndeclaredThrowableException, even if you annotate the method with @throws.
+ try {
+ realUser.doAs(new PrivilegedExceptionAction[T]() {
+ override def run(): T = fn
+ })
+ } catch {
+ case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
+ }
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
new file mode 100644
index 0000000000..4e3fcce8db
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+
+/**
+ * A credential provider for a service. User must implement this if they need to access a
+ * secure service from Spark.
+ */
+trait ServiceCredentialProvider {
+
+ /**
+ * Name of the service to provide credentials. This name should unique, Spark internally will
+ * use this name to differentiate credential provider.
+ */
+ def serviceName: String
+
+ /**
+ * To decide whether credential is required for this service. By default it based on whether
+ * Hadoop security is enabled.
+ */
+ def credentialsRequired(hadoopConf: Configuration): Boolean = {
+ UserGroupInformation.isSecurityEnabled
+ }
+
+ /**
+ * Obtain credentials for this service and get the time of the next renewal.
+ * @param hadoopConf Configuration of current Hadoop Compatible system.
+ * @param sparkConf Spark configuration.
+ * @param creds Credentials to add tokens and security keys to.
+ * @return If this Credential is renewable and can be renewed, return the time of the next
+ * renewal, otherwise None should be returned.
+ */
+ def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long]
+}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 56dc0004d0..d8b36c5fea 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -65,7 +65,7 @@ private[spark] class YarnClientSchedulerBackend(
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {
- YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf)
+ YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
}
monitorThread = asyncMonitorApplication()
monitorThread.start()
@@ -149,7 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
client.reportLauncherState(SparkAppHandle.State.FINISHED)
super.stop()
- YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
+ YarnSparkHadoopUtil.get.stopCredentialUpdater()
client.stop()
logInfo("Stopped")
}
diff --git a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000000..d0ef5efa36
--- /dev/null
+++ b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1 @@
+org.apache.spark.deploy.yarn.security.TestCredentialProvider
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index fe09808ae5..7fbbe12609 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -18,13 +18,9 @@
package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
-import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets
import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.io.Text
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -32,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
-import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -173,64 +169,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val util = new YarnSparkHadoopUtil
- val renewer = util.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val util = new YarnSparkHadoopUtil
- val caught =
- intercept[SparkException] {
- util.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-
test("check different hadoop utils based on env variable") {
try {
System.setProperty("SPARK_YARN_MODE", "true")
@@ -242,40 +180,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
- test("Obtain tokens For HiveMetastore") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hive.metastore.kerberos.principal", "bob")
- // thrift picks up on port 0 and bails out, without trying to talk to endpoint
- hadoopConf.set("hive.metastore.uris", "http://localhost:0")
- val util = new YarnSparkHadoopUtil
- assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastoreInner(hadoopConf)
- })
- assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastore(hadoopConf)
- })
- }
- private def assertNestedHiveException(e: InvocationTargetException): Throwable = {
- val inner = e.getCause
- if (inner == null) {
- fail("No inner cause", e)
- }
- if (!inner.isInstanceOf[HiveException]) {
- fail("Not a hive exception", inner)
- }
- inner
- }
-
- test("Obtain tokens For HBase") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hbase.security.authentication", "kerberos")
- val util = new YarnSparkHadoopUtil
- intercept[ClassNotFoundException] {
- util.obtainTokenForHBaseInner(hadoopConf)
- }
- util.obtainTokenForHBase(hadoopConf) should be (None)
- }
// This test needs to live here because it depends on isYarnMode returning true, which can only
// happen in the YARN module.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
new file mode 100644
index 0000000000..db4619e80c
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.Token
+import org.scalatest.{BeforeAndAfter, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
+
+class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+ private var credentialManager: ConfigurableCredentialManager = null
+ private var sparkConf: SparkConf = null
+ private var hadoopConf: Configuration = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sparkConf = new SparkConf()
+ hadoopConf = new Configuration()
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
+
+ override def afterAll(): Unit = {
+ System.clearProperty("SPARK_YARN_MODE")
+
+ super.afterAll()
+ }
+
+ test("Correctly load default credential providers") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ credentialManager.getServiceCredentialProvider("hive") should not be (None)
+ }
+
+ test("disable hive credential provider") {
+ sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ credentialManager.getServiceCredentialProvider("hive") should be (None)
+ }
+
+ test("using deprecated configurations") {
+ sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+ sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+ credentialManager.getServiceCredentialProvider("hive") should be (None)
+ credentialManager.getServiceCredentialProvider("test") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ }
+
+ test("verify obtaining credentials from provider") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ val creds = new Credentials()
+
+ // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot
+ // be obtained.
+ credentialManager.obtainCredentials(hadoopConf, creds)
+ val tokens = creds.getAllTokens
+ tokens.size() should be (1)
+ tokens.iterator().next().getService should be (new Text("test"))
+ }
+
+ test("verify getting credential renewal info") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ val creds = new Credentials()
+
+ val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get
+ .asInstanceOf[TestCredentialProvider]
+ // Only TestTokenProvider can get the time of next token renewal
+ val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
+ nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
+ }
+
+ test("obtain tokens For HiveMetastore") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+ // thrift picks up on port 0 and bails out, without trying to talk to endpoint
+ hadoopConf.set("hive.metastore.uris", "http://localhost:0")
+
+ val hiveCredentialProvider = new HiveCredentialProvider()
+ val credentials = new Credentials()
+ hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials)
+
+ credentials.getAllTokens.size() should be (0)
+ }
+
+ test("Obtain tokens For HBase") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hbase.security.authentication", "kerberos")
+
+ val hbaseTokenProvider = new HBaseCredentialProvider()
+ val creds = new Credentials()
+ hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
+
+ creds.getAllTokens.size should be (0)
+ }
+}
+
+class TestCredentialProvider extends ServiceCredentialProvider {
+ val tokenRenewalInterval = 86400 * 1000L
+ var timeOfNextTokenRenewal = 0L
+
+ override def serviceName: String = "test"
+
+ override def credentialsRequired(conf: Configuration): Boolean = true
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ if (creds == null) {
+ // Guard out other unit test failures.
+ return None
+ }
+
+ val emptyToken = new Token()
+ emptyToken.setService(new Text("test"))
+ creds.addToken(emptyToken.getService, emptyToken)
+
+ val currTime = System.currentTimeMillis()
+ timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval
+
+ Some(timeOfNextTokenRenewal)
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
new file mode 100644
index 0000000000..7b2da3f26e
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Matchers, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class HDFSCredentialProviderSuite
+ extends SparkFunSuite
+ with PrivateMethodTester
+ with Matchers {
+ private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
+
+ private def getTokenRenewer(
+ hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
+ hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+ }
+
+ private var hdfsCredentialProvider: HDFSCredentialProvider = null
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ if (hdfsCredentialProvider == null) {
+ hdfsCredentialProvider = new HDFSCredentialProvider()
+ }
+ }
+
+ override def afterAll() {
+ if (hdfsCredentialProvider != null) {
+ hdfsCredentialProvider = null
+ }
+
+ super.afterAll()
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val caught =
+ intercept[SparkException] {
+ getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
+}