aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-04-30 13:03:23 -0500
committerThomas Graves <tgraves@apache.org>2015-04-30 13:03:23 -0500
commit6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c (patch)
tree662e09f980e9eeaff8bd572537f9c2549c193337 /core
parent7dacc08ab36188991a001df23880167433844767 (diff)
downloadspark-6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.tar.gz
spark-6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.tar.bz2
spark-6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.zip
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS
Current Spark apps running on Secure YARN/HDFS would not be able to write data to HDFS after 7 days, since delegation tokens cannot be renewed beyond that. This means Spark Streaming apps will not be able to run on Secure YARN. This commit adds basic functionality to fix this issue. In this patch: - new parameters are added - principal and keytab, which can be used to login to a KDC - the client logs in, and then get tokens to start the AM - the keytab is copied to the staging directory - the AM waits for 60% of the time till expiry of the tokens and then logs in using the keytab - each time after 60% of the time, new tokens are created and sent to the executors Currently, to avoid complicating the architecture, we set the keytab and principal in the SparkHadoopUtil singleton, and schedule a login. Once the login is completed, a callback is scheduled. This is being posted for feedback, so I can gather feedback on the general implementation. There are currently a bunch of things to do: - [x] logging - [x] testing - I plan to manually test this soon. If you have ideas of how to add unit tests, comment. - [x] add code to ensure that if these params are set in non-YARN cluster mode, we complain - [x] documentation - [x] Have the executors request for credentials from the AM, so that retries are possible. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #4688 from harishreedharan/kerberos-longrunning and squashes the following commits: 36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments. 611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens. 09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml 6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to. 072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils. f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens. ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates. 7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required. e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message. 0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 7f1bc58 [Hari Shreedharan] Minor fixes, cleanup. bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup. f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files. 2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted. 61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS. 62c45ce [Hari Shreedharan] Relogin from keytab periodically. fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues. 42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master. 0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity. 9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes. f4fd711 [Hari Shreedharan] Fix SparkConf usage. 2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS. af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required. f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file. f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials. 5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues. b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS 0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire. d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens() 8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object. fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start() 41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests. bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil. f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled. 2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS. ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos 77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
6 files changed, 206 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
new file mode 100644
index 0000000000..80363aa3a1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+import scala.util.control.NonFatal
+
+private[spark] class ExecutorDelegationTokenUpdater(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration) extends Logging {
+
+ @volatile private var lastCredentialsFileSuffix = 0
+
+ private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+
+ private val delegationTokenRenewer =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+
+ // On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
+ private val executorUpdaterRunnable =
+ new Runnable {
+ override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
+ }
+
+ def updateCredentialsIfRequired(): Unit = {
+ try {
+ val credentialsFilePath = new Path(credentialsFile)
+ val remoteFs = FileSystem.get(hadoopConf)
+ SparkHadoopUtil.get.listFilesSorted(
+ remoteFs, credentialsFilePath.getParent,
+ credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.foreach { credentialsStatus =>
+ val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
+ if (suffix > lastCredentialsFileSuffix) {
+ logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
+ val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
+ lastCredentialsFileSuffix = suffix
+ UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+ logInfo("Tokens updated from credentials file.")
+ } else {
+ // Check every hour to see if new credentials arrived.
+ logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
+ "tokens yet, will check again in an hour.")
+ delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
+ return
+ }
+ }
+ val timeFromNowToRenewal =
+ SparkHadoopUtil.get.getTimeFromNowToRenewal(
+ sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
+ if (timeFromNowToRenewal <= 0) {
+ executorUpdaterRunnable.run()
+ } else {
+ logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
+ delegationTokenRenewer.schedule(
+ executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
+ }
+ } catch {
+ // Since the file may get deleted while we are reading it, catch the Exception and come
+ // back in an hour to try again
+ case NonFatal(e) =>
+ logWarning("Error while trying to update credentials, will try again in 1 hour", e)
+ delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
+ }
+ }
+
+ private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
+ val stream = remoteFs.open(tokenPath)
+ try {
+ val newCredentials = new Credentials()
+ newCredentials.readTokenStorageStream(stream)
+ newCredentials
+ } finally {
+ stream.close()
+ }
+ }
+
+ def stop(): Unit = {
+ delegationTokenRenewer.shutdown()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index cfaebf9ea5..00194fba84 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,12 +17,16 @@
package org.apache.spark.deploy
+import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
+import java.util.{Arrays, Comparator}
+import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -32,6 +36,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
+import scala.concurrent.duration._
/**
* :: DeveloperApi ::
@@ -39,7 +44,8 @@ import scala.collection.JavaConversions._
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
- val conf: Configuration = newConfiguration(new SparkConf())
+ private val sparkConf = new SparkConf()
+ val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)
/**
@@ -201,6 +207,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
+ /**
+ * Lists all the files in a directory with the specified prefix, and does not end with the
+ * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
+ * the respective files.
+ */
+ def listFilesSorted(
+ remoteFs: FileSystem,
+ dir: Path,
+ prefix: String,
+ exclusionSuffix: String): Array[FileStatus] = {
+ val fileStatuses = remoteFs.listStatus(dir,
+ new PathFilter {
+ override def accept(path: Path): Boolean = {
+ val name = path.getName
+ name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ }
+ })
+ Arrays.sort(fileStatuses, new Comparator[FileStatus] {
+ override def compare(o1: FileStatus, o2: FileStatus): Int = {
+ Longs.compare(o1.getModificationTime, o2.getModificationTime)
+ }
+ })
+ fileStatuses
+ }
+
+ /**
+ * How much time is remaining (in millis) from now to (fraction * renewal time for the token that
+ * is valid the latest)?
+ * This will return -ve (or 0) value if the fraction of validity has already expired.
+ */
+ def getTimeFromNowToRenewal(
+ sparkConf: SparkConf,
+ fraction: Double,
+ credentials: Credentials): Long = {
+ val now = System.currentTimeMillis()
+
+ val renewalInterval =
+ sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
+
+ credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .map { t =>
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ (identifier.getIssueDate + fraction * renewalInterval).toLong - now
+ }.foldLeft(0L)(math.max)
+ }
+
+
+ private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
+ val fileName = credentialsPath.getName
+ fileName.substring(
+ fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
+ }
+
+
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
/**
@@ -251,6 +312,10 @@ object SparkHadoopUtil {
}
}
+ val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
+
+ val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
+
def get: SparkHadoopUtil = {
hadoop
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0d149e703a..4b89ede427 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -401,6 +401,10 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
+ // Yarn client or cluster
+ OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
+ OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
+
// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c621b8fc86..c0e4c77190 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
+ var principal: String = null
+ var keytab: String = null
// Standalone cluster mode only
var supervise: Boolean = false
@@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROXY_USER =>
proxyUser = value
+ case PRINCIPAL =>
+ principal = value
+
+ case KEYTAB =>
+ keytab = value
+
case HELP =>
printUsageAndExit(0)
@@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
+ | --principal PRINCIPAL Principal to be used to login to KDC, while running on
+ | secure HDFS.
+ | --keytab KEYTAB The full path to the file that contains the keytab for the
+ | principal specified above. This keytab will be copied to
+ | the node running the Application Master via the Secure
+ | Distributed Cache, for renewing the login tickets and the
+ | delegation tokens periodically.
""".stripMargin
)
SparkSubmit.exitFn()
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 79aed90b53..9e0f06ab76 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
import org.apache.spark.rpc._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil}
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -168,6 +168,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
+ var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
+ if (driverConf.contains("spark.yarn.credentials.file")) {
+ logInfo("Will periodically update credentials from: " +
+ driverConf.get("spark.yarn.credentials.file"))
+ // Periodically update the credentials for this user to ensure HDFS tokens get updated.
+ tokenUpdaterOption =
+ Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))
+ tokenUpdaterOption.get.updateCredentialsIfRequired()
+ }
+
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
@@ -183,6 +193,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
+ tokenUpdaterOption.foreach(_.stop())
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7352fa1fe9..f107148f3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
+
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
+
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
-
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
properties += ((key, value))
}
}
+
// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))