aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorPatrick Wendell <patrick@databricks.com>2015-04-30 14:59:20 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-30 14:59:20 -0700
commite0628f2fae7f99d096f9dd625876a60d11020d9b (patch)
tree33739db91af80b01dddf1935f24a73a6267a1a43 /yarn
parentadbdb19a7d2cc939795f0cecbdc07c605dc946c1 (diff)
downloadspark-e0628f2fae7f99d096f9dd625876a60d11020d9b.tar.gz
spark-e0628f2fae7f99d096f9dd625876a60d11020d9b.tar.bz2
spark-e0628f2fae7f99d096f9dd625876a60d11020d9b.zip
Revert "[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS"
This reverts commit 6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala203
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala123
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala60
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala51
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala62
8 files changed, 106 insertions, 422 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
deleted file mode 100644
index 9ff02046de..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn
-
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{Executors, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.deploy.SparkHadoopUtil
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.ThreadUtils
-
-/*
- * The following methods are primarily meant to make sure long-running apps like Spark
- * Streaming apps can run without interruption while writing to secure HDFS. The
- * scheduleLoginFromKeytab method is called on the driver when the
- * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original delegation tokens used for the container
- * has elapsed. It then creates new delegation tokens and writes them to HDFS in a
- * pre-specified location - the prefix of which is specified in the sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
- * to a new file, with a monotonically increasing suffix). After this, the credentials are
- * updated once 75% of the new tokens renewal interval has elapsed.
- *
- * On the executor side, the updateCredentialsIfRequired method is called once 80% of the
- * validity of the original tokens has elapsed. At that time the executor finds the
- * credentials file with the latest timestamp and checks if it has read those credentials
- * before (by keeping track of the suffix of the last file it read). If a new file has
- * appeared, it will read the credentials and update the currently running UGI with it. This
- * process happens again once 80% of the validity of this has expired.
- */
-private[yarn] class AMDelegationTokenRenewer(
- sparkConf: SparkConf,
- hadoopConf: Configuration) extends Logging {
-
- private var lastCredentialsFileSuffix = 0
-
- private val delegationTokenRenewer =
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
- private val hadoopUtil = YarnSparkHadoopUtil.get
-
- private val daysToKeepFiles = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
- private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
-
- /**
- * Schedule a login from the keytab and principal set using the --principal and --keytab
- * arguments to spark-submit. This login happens only when the credentials of the current user
- * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
- * SparkConf to do the login. This method is a no-op in non-YARN mode.
- *
- */
- private[spark] def scheduleLoginFromKeytab(): Unit = {
- val principal = sparkConf.get("spark.yarn.principal")
- val keytab = sparkConf.get("spark.yarn.keytab")
-
- /**
- * Schedule re-login and creation of new tokens. If tokens have already expired, this method
- * will synchronously create new ones.
- */
- def scheduleRenewal(runnable: Runnable): Unit = {
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
- // Run now!
- if (renewalInterval <= 0) {
- logInfo("HDFS tokens have expired, creating new tokens now.")
- runnable.run()
- } else {
- logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
- delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
- }
- }
-
- // This thread periodically runs on the driver to update the delegation tokens on HDFS.
- val driverTokenRenewerRunnable =
- new Runnable {
- override def run(): Unit = {
- try {
- writeNewTokensToHDFS(principal, keytab)
- cleanupOldFiles()
- } catch {
- case e: Exception =>
- // Log the error and try to write new tokens back in an hour
- logWarning("Failed to write out new credentials to HDFS, will try again in an " +
- "hour! If this happens too often tasks will fail.", e)
- delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
- return
- }
- scheduleRenewal(this)
- }
- }
- // Schedule update of credentials. This handles the case of updating the tokens right now
- // as well, since the renenwal interval will be 0, and the thread will get scheduled
- // immediately.
- scheduleRenewal(driverTokenRenewerRunnable)
- }
-
- // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
- // least numFilesToKeep files are kept for safety
- private def cleanupOldFiles(): Unit = {
- import scala.concurrent.duration._
- try {
- val remoteFs = FileSystem.get(hadoopConf)
- val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
- val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
- hadoopUtil.listFilesSorted(
- remoteFs, credentialsPath.getParent,
- credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .dropRight(numFilesToKeep)
- .takeWhile(_.getModificationTime < thresholdTime)
- .foreach(x => remoteFs.delete(x.getPath, true))
- } catch {
- // Such errors are not fatal, so don't throw. Make sure they are logged though
- case e: Exception =>
- logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " +
- "warnings there may be an issue with your HDFS cluster.")
- }
- }
-
- private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
- // Keytab is copied by YARN to the working directory of the AM, so full path is
- // not needed.
-
- // HACK:
- // HDFS will not issue new delegation tokens, if the Credentials object
- // passed in already has tokens for that FS even if the tokens are expired (it really only
- // checks if there are tokens for the service, and not if they are valid). So the only real
- // way to get new tokens is to make sure a different Credentials object is used each time to
- // get new tokens and then the new tokens are copied over the the current user's Credentials.
- // So:
- // - we login as a different user and get the UGI
- // - use that UGI to get the tokens (see doAs block below)
- // - copy the tokens over to the current user's credentials (this will overwrite the tokens
- // in the current user's Credentials object for this FS).
- // The login to KDC happens each time new tokens are required, but this is rare enough to not
- // have to worry about (like once every day or so). This makes this code clearer than having
- // to login and then relogin every time (the HDFS API may not relogin since we don't use this
- // UGI directly for HDFS communication.
- logInfo(s"Attempting to login to KDC using principal: $principal")
- val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
- logInfo("Successfully logged into KDC.")
- val tempCreds = keytabLoggedInUGI.getCredentials
- val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
- val dst = credentialsPath.getParent
- keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
- // Get a copy of the credentials
- override def run(): Void = {
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
- hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
- null
- }
- })
- // Add the temp credentials back to the original ones.
- UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
- val remoteFs = FileSystem.get(hadoopConf)
- // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
- // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
- // and update the lastCredentialsFileSuffix.
- if (lastCredentialsFileSuffix == 0) {
- hadoopUtil.listFilesSorted(
- remoteFs, credentialsPath.getParent,
- credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .lastOption.foreach { status =>
- lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
- }
- }
- val nextSuffix = lastCredentialsFileSuffix + 1
- val tokenPathStr =
- sparkConf.get("spark.yarn.credentials.file") +
- SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
- val tokenPath = new Path(tokenPathStr)
- val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
- logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
- remoteFs.rename(tempTokenPath, tokenPath)
- logInfo("Delegation token file rename complete.")
- lastCredentialsFileSuffix = nextSuffix
- }
-
- def stop(): Unit = {
- delegationTokenRenewer.shutdown()
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 82f2b7e1dd..70cb57ffd8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -75,8 +75,6 @@ private[spark] class ApplicationMaster(
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
- private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
-
final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
@@ -127,15 +125,6 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
- // If the credentials file config is present, we must periodically renew tokens. So create
- // a new AMDelegationTokenRenewer
- if (sparkConf.contains("spark.yarn.credentials.file")) {
- delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
- // If a principal and keytab have been set, use that to create new credentials for executors
- // periodically
- delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
- }
-
if (isClusterMode) {
runDriver(securityMgr)
} else {
@@ -200,7 +189,6 @@ private[spark] class ApplicationMaster(
logDebug("shutting down user thread")
userClassThread.interrupt()
}
- if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
}
}
}
@@ -247,12 +235,12 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
- val driverEndpoint = rpcEnv.setupEndpointRef(
+ val driverEndpont = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
- rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
+ rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, isClusterMode))
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -506,7 +494,6 @@ private[spark] class ApplicationMaster(
override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
-
}
override def receive: PartialFunction[Any, Unit] = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bb126a4637..4abcf7307a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,11 +17,9 @@
package org.apache.spark.deploy.yarn
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
+import java.io.{File, FileOutputStream}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-import java.util.UUID
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
@@ -38,6 +36,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{TokenIdentifier, Token}
@@ -51,8 +50,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -70,13 +69,11 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
- private var credentials: Credentials = null
+ private val credentials = UserGroupInformation.getCurrentUser.getCredentials
private val amMemoryOverhead = args.amMemoryOverhead // MB
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
-
- private var loginFromKeytab = false
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
@@ -91,8 +88,6 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
- // Setup the credentials before doing anything else, so we have don't have issues at any point.
- setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
@@ -224,12 +219,12 @@ private[spark] class Client(
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
- YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
+ val nns = getNameNodesToAccess(sparkConf) + dst
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
+ obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
obtainTokenForHBase(hadoopConf, credentials)
@@ -248,20 +243,6 @@ private[spark] class Client(
"for alternatives.")
}
- // If we passed in a keytab, make sure we copy the keytab to the staging directory on
- // HDFS, and setup the relevant environment vars, so the AM can login again.
- if (loginFromKeytab) {
- logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
- " via the YARN Secure Distributed Cache.")
- val localUri = new URI(args.keytab)
- val localPath = getQualifiedLocalPath(localUri, hadoopConf)
- val destinationPath = copyFileToRemote(dst, localPath, replication)
- val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
- distCacheMgr.addResource(
- destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
- sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
- }
-
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -405,28 +386,6 @@ private[spark] class Client(
}
/**
- * Get the renewal interval for tokens.
- */
- private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
- // We cannot use the tokens generated above since those have renewer yarn. Trying to renew
- // those will fail with an access control issue. So create new tokens with the logged in
- // user as renewer.
- val creds = new Credentials()
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
- YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
- nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
- val t = creds.getAllTokens
- .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- .head
- val newExpiration = t.renew(hadoopConf)
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal Interval set to $interval")
- interval
- }
-
- /**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
@@ -437,16 +396,7 @@ private[spark] class Client(
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
- if (loginFromKeytab) {
- val remoteFs = FileSystem.get(hadoopConf)
- val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
- val credentialsFile = "credentials-" + UUID.randomUUID().toString
- sparkConf.set(
- "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
- logInfo(s"Credentials file set to: $credentialsFile")
- val renewalInterval = getTokenRenewalInterval(stagingDirPath)
- sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
- }
+
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@@ -511,6 +461,7 @@ private[spark] class Client(
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
+
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
@@ -681,24 +632,6 @@ private[spark] class Client(
amContainer
}
- def setupCredentials(): Unit = {
- if (args.principal != null) {
- require(args.keytab != null, "Keytab must be specified when principal is specified.")
- logInfo("Attempting to login to the Kerberos" +
- s" using principal: ${args.principal} and keytab: ${args.keytab}")
- val f = new File(args.keytab)
- // Generate a file name that can be used for the keytab file, that does not conflict
- // with any user file.
- val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
- UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
- loginFromKeytab = true
- sparkConf.set("spark.yarn.keytab", keytabFileName)
- sparkConf.set("spark.yarn.principal", args.principal)
- logInfo("Successfully logged into the KDC.")
- }
- credentials = UserGroupInformation.getCurrentUser.getCredentials
- }
-
/**
* Report the state of an application until it has exited, either successfully or
* due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
@@ -1055,6 +988,46 @@ object Client extends Logging {
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
+ * Get the list of namenodes the user may access.
+ */
+ private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get("spark.yarn.access.namenodes", "")
+ .split(",")
+ .map(_.trim())
+ .filter(!_.isEmpty)
+ .map(new Path(_))
+ .toSet
+ }
+
+ private[yarn] def getTokenRenewer(conf: Configuration): String = {
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ logDebug("delegation token renewer is: " + delegTokenRenewer)
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
+ }
+ delegTokenRenewer
+ }
+
+ /**
+ * Obtains tokens for the namenodes passed in and adds them to the credentials.
+ */
+ private def obtainTokensForNamenodes(
+ paths: Set[Path],
+ conf: Configuration,
+ creds: Credentials): Unit = {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val delegTokenRenewer = getTokenRenewer(conf)
+ paths.foreach { dst =>
+ val dstFs = dst.getFileSystem(conf)
+ logDebug("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ }
+ }
+ }
+
+ /**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 5653c9f14d..1423533470 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -42,8 +42,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
- var principal: String = null
- var keytab: String = null
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
@@ -233,14 +231,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
archives = value
args = tail
- case ("--principal") :: value :: tail =>
- principal = value
- args = tail
-
- case ("--keytab") :: value :: tail =>
- keytab = value
- args = tail
-
case Nil =>
case _ =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a5c454e2e5..5881dc5ffa 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,26 +17,25 @@
package org.apache.spark.deploy.yarn
-import java.io._
+import java.io.File
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.HashMap
import scala.util.Try
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.{Master, JobConf}
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils
/**
@@ -83,48 +82,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
if (credentials != null) credentials.getSecretKey(new Text(key)) else null
}
- /**
- * Get the list of namenodes the user may access.
- */
- def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get("spark.yarn.access.namenodes", "")
- .split(",")
- .map(_.trim())
- .filter(!_.isEmpty)
- .map(new Path(_))
- .toSet
- }
-
- def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
- delegTokenRenewer
- }
-
- /**
- * Obtains tokens for the namenodes passed in and adds them to the credentials.
- */
- def obtainTokensForNamenodes(
- paths: Set[Path],
- conf: Configuration,
- creds: Credentials,
- renewer: Option[String] = None
- ): Unit = {
- if (UserGroupInformation.isSecurityEnabled()) {
- val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
- paths.foreach { dst =>
- val dstFs = dst.getFileSystem(conf)
- logInfo("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(delegTokenRenewer, creds)
- }
- }
- }
-
}
object YarnSparkHadoopUtil {
@@ -143,14 +100,6 @@ object YarnSparkHadoopUtil {
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
- def get: YarnSparkHadoopUtil = {
- val yarnMode = java.lang.Boolean.valueOf(
- System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
- if (!yarnMode) {
- throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!")
- }
- SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
- }
/**
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.
@@ -263,4 +212,3 @@ object YarnSparkHadoopUtil {
classPathSeparatorField.get(null).asInstanceOf[String]
}
}
-
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 508819e242..a51c2005cb 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,6 +151,57 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
}
}
+ test("check access nns empty") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "")
+ val nns = Client.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns unset") {
+ val sparkConf = new SparkConf()
+ val nns = Client.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+ val nns = Client.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access nns space") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+ val nns = Client.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access two nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+ val nns = Client.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val renewer = Client.getTokenRenewer(hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val caught =
+ intercept[SparkException] {
+ Client.getTokenRenewer(hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d3c606e0ed..3877da4120 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -86,7 +86,6 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
tempDir = Utils.createTempDir()
logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
- System.setProperty("SPARK_YARN_MODE", "true")
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, UTF_8)
@@ -129,7 +128,6 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
override def afterAll() {
yarnCluster.stop()
- System.clearProperty("SPARK_YARN_MODE")
super.afterAll()
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index e10b985c3c..9395316b71 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -29,7 +27,7 @@ import org.scalatest.{FunSuite, Matchers}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.Utils
@@ -175,62 +173,4 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
}
}
-
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val util = new YarnSparkHadoopUtil
- val renewer = util.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val util = new YarnSparkHadoopUtil
- val caught =
- intercept[SparkException] {
- util.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
}