diff options
Diffstat (limited to 'core')
6 files changed, 206 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala new file mode 100644 index 0000000000..80363aa3a1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import java.util.concurrent.{Executors, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.util.{ThreadUtils, Utils} + +import scala.util.control.NonFatal + +private[spark] class ExecutorDelegationTokenUpdater( + sparkConf: SparkConf, + hadoopConf: Configuration) extends Logging { + + @volatile private var lastCredentialsFileSuffix = 0 + + private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") + + private val delegationTokenRenewer = + Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread")) + + // On the executor, this thread wakes up and picks up new tokens from HDFS, if any. + private val executorUpdaterRunnable = + new Runnable { + override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired()) + } + + def updateCredentialsIfRequired(): Unit = { + try { + val credentialsFilePath = new Path(credentialsFile) + val remoteFs = FileSystem.get(hadoopConf) + SparkHadoopUtil.get.listFilesSorted( + remoteFs, credentialsFilePath.getParent, + credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) + .lastOption.foreach { credentialsStatus => + val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath) + if (suffix > lastCredentialsFileSuffix) { + logInfo("Reading new delegation tokens from " + credentialsStatus.getPath) + val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath) + lastCredentialsFileSuffix = suffix + UserGroupInformation.getCurrentUser.addCredentials(newCredentials) + logInfo("Tokens updated from credentials file.") + } else { + // Check every hour to see if new credentials arrived. + logInfo("Updated delegation tokens were expected, but the driver has not updated the " + + "tokens yet, will check again in an hour.") + delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS) + return + } + } + val timeFromNowToRenewal = + SparkHadoopUtil.get.getTimeFromNowToRenewal( + sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials) + if (timeFromNowToRenewal <= 0) { + executorUpdaterRunnable.run() + } else { + logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.") + delegationTokenRenewer.schedule( + executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS) + } + } catch { + // Since the file may get deleted while we are reading it, catch the Exception and come + // back in an hour to try again + case NonFatal(e) => + logWarning("Error while trying to update credentials, will try again in 1 hour", e) + delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS) + } + } + + private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = { + val stream = remoteFs.open(tokenPath) + try { + val newCredentials = new Credentials() + newCredentials.readTokenStorageStream(stream) + newCredentials + } finally { + stream.close() + } + } + + def stop(): Unit = { + delegationTokenRenewer.shutdown() + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index cfaebf9ea5..00194fba84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,12 +17,16 @@ package org.apache.spark.deploy +import java.io.{ByteArrayInputStream, DataInputStream} import java.lang.reflect.Method import java.security.PrivilegedExceptionAction +import java.util.{Arrays, Comparator} +import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.fs.FileSystem.Statistics +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.security.{Credentials, UserGroupInformation} @@ -32,6 +36,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils import scala.collection.JavaConversions._ +import scala.concurrent.duration._ /** * :: DeveloperApi :: @@ -39,7 +44,8 @@ import scala.collection.JavaConversions._ */ @DeveloperApi class SparkHadoopUtil extends Logging { - val conf: Configuration = newConfiguration(new SparkConf()) + private val sparkConf = new SparkConf() + val conf: Configuration = newConfiguration(sparkConf) UserGroupInformation.setConfiguration(conf) /** @@ -201,6 +207,61 @@ class SparkHadoopUtil extends Logging { if (baseStatus.isDir) recurse(basePath) else Array(baseStatus) } + /** + * Lists all the files in a directory with the specified prefix, and does not end with the + * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of + * the respective files. + */ + def listFilesSorted( + remoteFs: FileSystem, + dir: Path, + prefix: String, + exclusionSuffix: String): Array[FileStatus] = { + val fileStatuses = remoteFs.listStatus(dir, + new PathFilter { + override def accept(path: Path): Boolean = { + val name = path.getName + name.startsWith(prefix) && !name.endsWith(exclusionSuffix) + } + }) + Arrays.sort(fileStatuses, new Comparator[FileStatus] { + override def compare(o1: FileStatus, o2: FileStatus): Int = { + Longs.compare(o1.getModificationTime, o2.getModificationTime) + } + }) + fileStatuses + } + + /** + * How much time is remaining (in millis) from now to (fraction * renewal time for the token that + * is valid the latest)? + * This will return -ve (or 0) value if the fraction of validity has already expired. + */ + def getTimeFromNowToRenewal( + sparkConf: SparkConf, + fraction: Double, + credentials: Credentials): Long = { + val now = System.currentTimeMillis() + + val renewalInterval = + sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis) + + credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + .map { t => + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + (identifier.getIssueDate + fraction * renewalInterval).toLong - now + }.foldLeft(0L)(math.max) + } + + + private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = { + val fileName = credentialsPath.getName + fileName.substring( + fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt + } + + private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored /** @@ -251,6 +312,10 @@ object SparkHadoopUtil { } } + val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp" + + val SPARK_YARN_CREDS_COUNTER_DELIM = "-" + def get: SparkHadoopUtil = { hadoop } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0d149e703a..4b89ede427 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -401,6 +401,10 @@ object SparkSubmit { OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), + // Yarn client or cluster + OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"), + OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"), + // Other options OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index c621b8fc86..c0e4c77190 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var action: SparkSubmitAction = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() var proxyUser: String = null + var principal: String = null + var keytab: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case PROXY_USER => proxyUser = value + case PRINCIPAL => + principal = value + + case KEYTAB => + keytab = value + case HELP => printUsageAndExit(0) @@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. + | --principal PRINCIPAL Principal to be used to login to KDC, while running on + | secure HDFS. + | --keytab KEYTAB The full path to the file that contains the keytab for the + | principal specified above. This keytab will be copied to + | the node running the Application Master via the Secure + | Distributed Cache, for renewing the login tickets and the + | delegation tokens periodically. """.stripMargin ) SparkSubmit.exitFn() diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 79aed90b53..9e0f06ab76 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,7 +26,7 @@ import scala.util.{Failure, Success} import org.apache.spark.rpc._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil} import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -168,6 +168,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(key, value) } } + var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None + if (driverConf.contains("spark.yarn.credentials.file")) { + logInfo("Will periodically update credentials from: " + + driverConf.get("spark.yarn.credentials.file")) + // Periodically update the credentials for this user to ensure HDFS tokens get updated. + tokenUpdaterOption = + Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf)) + tokenUpdaterOption.get.updateCredentialsIfRequired() + } + val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) @@ -183,6 +193,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() + tokenUpdaterOption.foreach(_.stop()) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7352fa1fe9..f107148f3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { + override protected def log = CoarseGrainedSchedulerBackend.this.log private val addressToExecutorId = new HashMap[RpcAddress, String] @@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } else { logInfo("Registered executor: " + executorRef + " with ID " + executorId) context.reply(RegisteredExecutor) - addressToExecutorId(executorRef.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) @@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp properties += ((key, value)) } } + // TODO (prashant) send conf instead of properties driverEndpoint = rpcEnv.setupEndpoint( CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties)) |