aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
6 files changed, 206 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
new file mode 100644
index 0000000000..80363aa3a1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+import scala.util.control.NonFatal
+
+private[spark] class ExecutorDelegationTokenUpdater(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration) extends Logging {
+
+ @volatile private var lastCredentialsFileSuffix = 0
+
+ private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+
+ private val delegationTokenRenewer =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
+
+ // On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
+ private val executorUpdaterRunnable =
+ new Runnable {
+ override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
+ }
+
+ def updateCredentialsIfRequired(): Unit = {
+ try {
+ val credentialsFilePath = new Path(credentialsFile)
+ val remoteFs = FileSystem.get(hadoopConf)
+ SparkHadoopUtil.get.listFilesSorted(
+ remoteFs, credentialsFilePath.getParent,
+ credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+ .lastOption.foreach { credentialsStatus =>
+ val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
+ if (suffix > lastCredentialsFileSuffix) {
+ logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
+ val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
+ lastCredentialsFileSuffix = suffix
+ UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+ logInfo("Tokens updated from credentials file.")
+ } else {
+ // Check every hour to see if new credentials arrived.
+ logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
+ "tokens yet, will check again in an hour.")
+ delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
+ return
+ }
+ }
+ val timeFromNowToRenewal =
+ SparkHadoopUtil.get.getTimeFromNowToRenewal(
+ sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
+ if (timeFromNowToRenewal <= 0) {
+ executorUpdaterRunnable.run()
+ } else {
+ logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
+ delegationTokenRenewer.schedule(
+ executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
+ }
+ } catch {
+ // Since the file may get deleted while we are reading it, catch the Exception and come
+ // back in an hour to try again
+ case NonFatal(e) =>
+ logWarning("Error while trying to update credentials, will try again in 1 hour", e)
+ delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
+ }
+ }
+
+ private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
+ val stream = remoteFs.open(tokenPath)
+ try {
+ val newCredentials = new Credentials()
+ newCredentials.readTokenStorageStream(stream)
+ newCredentials
+ } finally {
+ stream.close()
+ }
+ }
+
+ def stop(): Unit = {
+ delegationTokenRenewer.shutdown()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index cfaebf9ea5..00194fba84 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,12 +17,16 @@
package org.apache.spark.deploy
+import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
+import java.util.{Arrays, Comparator}
+import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -32,6 +36,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
+import scala.concurrent.duration._
/**
* :: DeveloperApi ::
@@ -39,7 +44,8 @@ import scala.collection.JavaConversions._
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
- val conf: Configuration = newConfiguration(new SparkConf())
+ private val sparkConf = new SparkConf()
+ val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)
/**
@@ -201,6 +207,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
+ /**
+ * Lists all the files in a directory with the specified prefix, and does not end with the
+ * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
+ * the respective files.
+ */
+ def listFilesSorted(
+ remoteFs: FileSystem,
+ dir: Path,
+ prefix: String,
+ exclusionSuffix: String): Array[FileStatus] = {
+ val fileStatuses = remoteFs.listStatus(dir,
+ new PathFilter {
+ override def accept(path: Path): Boolean = {
+ val name = path.getName
+ name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ }
+ })
+ Arrays.sort(fileStatuses, new Comparator[FileStatus] {
+ override def compare(o1: FileStatus, o2: FileStatus): Int = {
+ Longs.compare(o1.getModificationTime, o2.getModificationTime)
+ }
+ })
+ fileStatuses
+ }
+
+ /**
+ * How much time is remaining (in millis) from now to (fraction * renewal time for the token that
+ * is valid the latest)?
+ * This will return -ve (or 0) value if the fraction of validity has already expired.
+ */
+ def getTimeFromNowToRenewal(
+ sparkConf: SparkConf,
+ fraction: Double,
+ credentials: Credentials): Long = {
+ val now = System.currentTimeMillis()
+
+ val renewalInterval =
+ sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
+
+ credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ .map { t =>
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ (identifier.getIssueDate + fraction * renewalInterval).toLong - now
+ }.foldLeft(0L)(math.max)
+ }
+
+
+ private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
+ val fileName = credentialsPath.getName
+ fileName.substring(
+ fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
+ }
+
+
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
/**
@@ -251,6 +312,10 @@ object SparkHadoopUtil {
}
}
+ val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
+
+ val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
+
def get: SparkHadoopUtil = {
hadoop
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0d149e703a..4b89ede427 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -401,6 +401,10 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
+ // Yarn client or cluster
+ OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
+ OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
+
// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c621b8fc86..c0e4c77190 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
+ var principal: String = null
+ var keytab: String = null
// Standalone cluster mode only
var supervise: Boolean = false
@@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROXY_USER =>
proxyUser = value
+ case PRINCIPAL =>
+ principal = value
+
+ case KEYTAB =>
+ keytab = value
+
case HELP =>
printUsageAndExit(0)
@@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
+ | --principal PRINCIPAL Principal to be used to login to KDC, while running on
+ | secure HDFS.
+ | --keytab KEYTAB The full path to the file that contains the keytab for the
+ | principal specified above. This keytab will be copied to
+ | the node running the Application Master via the Secure
+ | Distributed Cache, for renewing the login tickets and the
+ | delegation tokens periodically.
""".stripMargin
)
SparkSubmit.exitFn()
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 79aed90b53..9e0f06ab76 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
import org.apache.spark.rpc._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil}
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -168,6 +168,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
+ var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
+ if (driverConf.contains("spark.yarn.credentials.file")) {
+ logInfo("Will periodically update credentials from: " +
+ driverConf.get("spark.yarn.credentials.file"))
+ // Periodically update the credentials for this user to ensure HDFS tokens get updated.
+ tokenUpdaterOption =
+ Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))
+ tokenUpdaterOption.get.updateCredentialsIfRequired()
+ }
+
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
@@ -183,6 +193,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
+ tokenUpdaterOption.foreach(_.stop())
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7352fa1fe9..f107148f3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
+
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
+
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
-
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
properties += ((key, value))
}
}
+
// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))