From e0628f2fae7f99d096f9dd625876a60d11020d9b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 30 Apr 2015 14:59:20 -0700 Subject: Revert "[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS" This reverts commit 6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c. --- .../deploy/ExecutorDelegationTokenUpdater.scala | 105 --------------------- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 69 +------------- .../org/apache/spark/deploy/SparkSubmit.scala | 4 - .../apache/spark/deploy/SparkSubmitArguments.scala | 15 --- .../executor/CoarseGrainedExecutorBackend.scala | 13 +-- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 +- 6 files changed, 4 insertions(+), 206 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala deleted file mode 100644 index 80363aa3a1..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy - -import java.util.concurrent.{Executors, TimeUnit} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.util.{ThreadUtils, Utils} - -import scala.util.control.NonFatal - -private[spark] class ExecutorDelegationTokenUpdater( - sparkConf: SparkConf, - hadoopConf: Configuration) extends Logging { - - @volatile private var lastCredentialsFileSuffix = 0 - - private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") - - private val delegationTokenRenewer = - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread")) - - // On the executor, this thread wakes up and picks up new tokens from HDFS, if any. - private val executorUpdaterRunnable = - new Runnable { - override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired()) - } - - def updateCredentialsIfRequired(): Unit = { - try { - val credentialsFilePath = new Path(credentialsFile) - val remoteFs = FileSystem.get(hadoopConf) - SparkHadoopUtil.get.listFilesSorted( - remoteFs, credentialsFilePath.getParent, - credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .lastOption.foreach { credentialsStatus => - val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath) - if (suffix > lastCredentialsFileSuffix) { - logInfo("Reading new delegation tokens from " + credentialsStatus.getPath) - val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath) - lastCredentialsFileSuffix = suffix - UserGroupInformation.getCurrentUser.addCredentials(newCredentials) - logInfo("Tokens updated from credentials file.") - } else { - // Check every hour to see if new credentials arrived. - logInfo("Updated delegation tokens were expected, but the driver has not updated the " + - "tokens yet, will check again in an hour.") - delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS) - return - } - } - val timeFromNowToRenewal = - SparkHadoopUtil.get.getTimeFromNowToRenewal( - sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials) - if (timeFromNowToRenewal <= 0) { - executorUpdaterRunnable.run() - } else { - logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.") - delegationTokenRenewer.schedule( - executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS) - } - } catch { - // Since the file may get deleted while we are reading it, catch the Exception and come - // back in an hour to try again - case NonFatal(e) => - logWarning("Error while trying to update credentials, will try again in 1 hour", e) - delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS) - } - } - - private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = { - val stream = remoteFs.open(tokenPath) - try { - val newCredentials = new Credentials() - newCredentials.readTokenStorageStream(stream) - newCredentials - } finally { - stream.close() - } - } - - def stop(): Unit = { - delegationTokenRenewer.shutdown() - } - -} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 00194fba84..cfaebf9ea5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,16 +17,12 @@ package org.apache.spark.deploy -import java.io.{ByteArrayInputStream, DataInputStream} import java.lang.reflect.Method import java.security.PrivilegedExceptionAction -import java.util.{Arrays, Comparator} -import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.FileSystem.Statistics -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.security.{Credentials, UserGroupInformation} @@ -36,7 +32,6 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils import scala.collection.JavaConversions._ -import scala.concurrent.duration._ /** * :: DeveloperApi :: @@ -44,8 +39,7 @@ import scala.concurrent.duration._ */ @DeveloperApi class SparkHadoopUtil extends Logging { - private val sparkConf = new SparkConf() - val conf: Configuration = newConfiguration(sparkConf) + val conf: Configuration = newConfiguration(new SparkConf()) UserGroupInformation.setConfiguration(conf) /** @@ -207,61 +201,6 @@ class SparkHadoopUtil extends Logging { if (baseStatus.isDir) recurse(basePath) else Array(baseStatus) } - /** - * Lists all the files in a directory with the specified prefix, and does not end with the - * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of - * the respective files. - */ - def listFilesSorted( - remoteFs: FileSystem, - dir: Path, - prefix: String, - exclusionSuffix: String): Array[FileStatus] = { - val fileStatuses = remoteFs.listStatus(dir, - new PathFilter { - override def accept(path: Path): Boolean = { - val name = path.getName - name.startsWith(prefix) && !name.endsWith(exclusionSuffix) - } - }) - Arrays.sort(fileStatuses, new Comparator[FileStatus] { - override def compare(o1: FileStatus, o2: FileStatus): Int = { - Longs.compare(o1.getModificationTime, o2.getModificationTime) - } - }) - fileStatuses - } - - /** - * How much time is remaining (in millis) from now to (fraction * renewal time for the token that - * is valid the latest)? - * This will return -ve (or 0) value if the fraction of validity has already expired. - */ - def getTimeFromNowToRenewal( - sparkConf: SparkConf, - fraction: Double, - credentials: Credentials): Long = { - val now = System.currentTimeMillis() - - val renewalInterval = - sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis) - - credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - .map { t => - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - (identifier.getIssueDate + fraction * renewalInterval).toLong - now - }.foldLeft(0L)(math.max) - } - - - private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = { - val fileName = credentialsPath.getName - fileName.substring( - fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt - } - - private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored /** @@ -312,10 +251,6 @@ object SparkHadoopUtil { } } - val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp" - - val SPARK_YARN_CREDS_COUNTER_DELIM = "-" - def get: SparkHadoopUtil = { hadoop } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4b89ede427..0d149e703a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -401,10 +401,6 @@ object SparkSubmit { OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), - // Yarn client or cluster - OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"), - OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"), - // Other options OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index c0e4c77190..c621b8fc86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -63,8 +63,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var action: SparkSubmitAction = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() var proxyUser: String = null - var principal: String = null - var keytab: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -395,12 +393,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case PROXY_USER => proxyUser = value - case PRINCIPAL => - principal = value - - case KEYTAB => - keytab = value - case HELP => printUsageAndExit(0) @@ -514,13 +506,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. - | --principal PRINCIPAL Principal to be used to login to KDC, while running on - | secure HDFS. - | --keytab KEYTAB The full path to the file that contains the keytab for the - | principal specified above. This keytab will be copied to - | the node running the Application Master via the Secure - | Distributed Cache, for renewing the login tickets and the - | delegation tokens periodically. """.stripMargin ) SparkSubmit.exitFn() diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9e0f06ab76..79aed90b53 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,7 +26,7 @@ import scala.util.{Failure, Success} import org.apache.spark.rpc._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -168,16 +168,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(key, value) } } - var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None - if (driverConf.contains("spark.yarn.credentials.file")) { - logInfo("Will periodically update credentials from: " + - driverConf.get("spark.yarn.credentials.file")) - // Periodically update the credentials for this user to ensure HDFS tokens get updated. - tokenUpdaterOption = - Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf)) - tokenUpdaterOption.get.updateCredentialsIfRequired() - } - val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) @@ -193,7 +183,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() - tokenUpdaterOption.foreach(_.stop()) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f107148f3b..7352fa1fe9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -68,7 +68,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { - override protected def log = CoarseGrainedSchedulerBackend.this.log private val addressToExecutorId = new HashMap[RpcAddress, String] @@ -113,7 +112,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } - } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -124,6 +122,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } else { logInfo("Registered executor: " + executorRef + " with ID " + executorId) context.reply(RegisteredExecutor) + addressToExecutorId(executorRef.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) @@ -244,7 +243,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp properties += ((key, value)) } } - // TODO (prashant) send conf instead of properties driverEndpoint = rpcEnv.setupEndpoint( CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties)) -- cgit v1.2.3