aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-05-01 15:32:09 -0500
committerThomas Graves <tgraves@apache.org>2015-05-01 15:32:09 -0500
commitb1f4ca82d170935d15f1fe6beb9af0743b4d81cd (patch)
treedb2a1fb19feaa2c28e201c4bdce1f7a2b864f794 /core
parent4dc8d74491b101a794cf8d386d8c5ebc6019b75f (diff)
downloadspark-b1f4ca82d170935d15f1fe6beb9af0743b4d81cd.tar.gz
spark-b1f4ca82d170935d15f1fe6beb9af0743b4d81cd.tar.bz2
spark-b1f4ca82d170935d15f1fe6beb9af0743b4d81cd.zip
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS
Take 2. Does the same thing as #4688, but fixes Hadoop-1 build. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #5823 from harishreedharan/kerberos-longrunning and squashes the following commits: 3c86bba [Hari Shreedharan] Import fixes. Import postfixOps explicitly. 4d04301 [Hari Shreedharan] Minor formatting fixes. b5e7a72 [Hari Shreedharan] Remove reflection, use a method in SparkHadoopUtil to update the token renewer. 7bff6e9 [Hari Shreedharan] Make sure all required classes are present in the jar. Fix import order. e851f70 [Hari Shreedharan] Move the ExecutorDelegationTokenRenewer to yarn module. Use reflection to use it. 36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments. 611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens. 09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml 6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to. 072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils. f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens. ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates. 7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required. e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message. 0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 7f1bc58 [Hari Shreedharan] Minor fixes, cleanup. bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup. f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files. 2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted. 61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS. 62c45ce [Hari Shreedharan] Relogin from keytab periodically. fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues. 42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master. 0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity. 9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes. f4fd711 [Hari Shreedharan] Fix SparkConf usage. 2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS. af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required. f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file. f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials. 5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues. b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS 0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire. d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens() 8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object. fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start() 41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests. bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil. f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled. 2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS. ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos 77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala81
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
5 files changed, 110 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index cfaebf9ea5..b563034457 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,12 +17,16 @@
package org.apache.spark.deploy
+import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
+import java.util.{Arrays, Comparator}
+import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -32,6 +36,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
+import scala.concurrent.duration._
+import scala.language.postfixOps
/**
* :: DeveloperApi ::
@@ -39,7 +45,8 @@ import scala.collection.JavaConversions._
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
- val conf: Configuration = newConfiguration(new SparkConf())
+ private val sparkConf = new SparkConf()
+ val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)
/**
@@ -201,6 +208,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
+ /**
+ * Lists all the files in a directory with the specified prefix, and does not end with the
+ * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
+ * the respective files.
+ */
+ def listFilesSorted(
+ remoteFs: FileSystem,
+ dir: Path,
+ prefix: String,
+ exclusionSuffix: String): Array[FileStatus] = {
+ val fileStatuses = remoteFs.listStatus(dir,
+ new PathFilter {
+ override def accept(path: Path): Boolean = {
+ val name = path.getName
+ name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ }
+ })
+ Arrays.sort(fileStatuses, new Comparator[FileStatus] {
+ override def compare(o1: FileStatus, o2: FileStatus): Int = {
+ Longs.compare(o1.getModificationTime, o2.getModificationTime)
+ }
+ })
+ fileStatuses
+ }
+
+ /**
+ * How much time is remaining (in millis) from now to (fraction * renewal time for the token that
+ * is valid the latest)?
+ * This will return -ve (or 0) value if the fraction of validity has already expired.
+ */
+ def getTimeFromNowToRenewal(
+ sparkConf: SparkConf,
+ fraction: Double,
+ credentials: Credentials): Long = {
+ val now = System.currentTimeMillis()
+
+ val renewalInterval =
+ sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
+
+ credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .map { t =>
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ (identifier.getIssueDate + fraction * renewalInterval).toLong - now
+ }.foldLeft(0L)(math.max)
+ }
+
+
+ private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
+ val fileName = credentialsPath.getName
+ fileName.substring(
+ fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
+ }
+
+
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
/**
@@ -231,6 +293,17 @@ class SparkHadoopUtil extends Logging {
}
}
}
+
+ /**
+ * Start a thread to periodically update the current user's credentials with new delegation
+ * tokens so that writes to HDFS do not fail.
+ */
+ private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}
+
+ /**
+ * Stop the thread that does the delegation token updates.
+ */
+ private[spark] def stopExecutorDelegationTokenRenewer() {}
}
object SparkHadoopUtil {
@@ -251,6 +324,10 @@ object SparkHadoopUtil {
}
}
+ val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
+
+ val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
+
def get: SparkHadoopUtil = {
hadoop
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b8ae4af18d..af38bf80e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -400,6 +400,10 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
+ // Yarn client or cluster
+ OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
+ OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
+
// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c621b8fc86..c0e4c77190 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
+ var principal: String = null
+ var keytab: String = null
// Standalone cluster mode only
var supervise: Boolean = false
@@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROXY_USER =>
proxyUser = value
+ case PRINCIPAL =>
+ principal = value
+
+ case KEYTAB =>
+ keytab = value
+
case HELP =>
printUsageAndExit(0)
@@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
+ | --principal PRINCIPAL Principal to be used to login to KDC, while running on
+ | secure HDFS.
+ | --keytab KEYTAB The full path to the file that contains the keytab for the
+ | principal specified above. This keytab will be copied to
+ | the node running the Application Master via the Secure
+ | Distributed Cache, for renewing the login tickets and the
+ | delegation tokens periodically.
""".stripMargin
)
SparkSubmit.exitFn()
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 79aed90b53..ed159dec4f 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -20,6 +20,8 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer
+import org.apache.hadoop.conf.Configuration
+
import scala.collection.mutable
import scala.util.{Failure, Success}
@@ -168,6 +170,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
+ if (driverConf.contains("spark.yarn.credentials.file")) {
+ logInfo("Will periodically update credentials from: " +
+ driverConf.get("spark.yarn.credentials.file"))
+ SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
+ }
+
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
@@ -183,6 +191,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
+ SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7352fa1fe9..f107148f3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
+
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
+
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
-
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
properties += ((key, value))
}
}
+
// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))