aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <patrick@databricks.com>2015-04-30 14:59:20 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-30 14:59:20 -0700
commite0628f2fae7f99d096f9dd625876a60d11020d9b (patch)
tree33739db91af80b01dddf1935f24a73a6267a1a43 /core/src
parentadbdb19a7d2cc939795f0cecbdc07c605dc946c1 (diff)
downloadspark-e0628f2fae7f99d096f9dd625876a60d11020d9b.tar.gz
spark-e0628f2fae7f99d096f9dd625876a60d11020d9b.tar.bz2
spark-e0628f2fae7f99d096f9dd625876a60d11020d9b.zip
Revert "[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS"
This reverts commit 6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
6 files changed, 4 insertions, 206 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
deleted file mode 100644
index 80363aa3a1..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-import scala.util.control.NonFatal
-
-private[spark] class ExecutorDelegationTokenUpdater(
- sparkConf: SparkConf,
- hadoopConf: Configuration) extends Logging {
-
- @volatile private var lastCredentialsFileSuffix = 0
-
- private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
-
- private val delegationTokenRenewer =
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
- // On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
- private val executorUpdaterRunnable =
- new Runnable {
- override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
- }
-
- def updateCredentialsIfRequired(): Unit = {
- try {
- val credentialsFilePath = new Path(credentialsFile)
- val remoteFs = FileSystem.get(hadoopConf)
- SparkHadoopUtil.get.listFilesSorted(
- remoteFs, credentialsFilePath.getParent,
- credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .lastOption.foreach { credentialsStatus =>
- val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
- if (suffix > lastCredentialsFileSuffix) {
- logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
- val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
- lastCredentialsFileSuffix = suffix
- UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
- logInfo("Tokens updated from credentials file.")
- } else {
- // Check every hour to see if new credentials arrived.
- logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
- "tokens yet, will check again in an hour.")
- delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
- return
- }
- }
- val timeFromNowToRenewal =
- SparkHadoopUtil.get.getTimeFromNowToRenewal(
- sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
- if (timeFromNowToRenewal <= 0) {
- executorUpdaterRunnable.run()
- } else {
- logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
- delegationTokenRenewer.schedule(
- executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
- }
- } catch {
- // Since the file may get deleted while we are reading it, catch the Exception and come
- // back in an hour to try again
- case NonFatal(e) =>
- logWarning("Error while trying to update credentials, will try again in 1 hour", e)
- delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
- }
- }
-
- private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
- val stream = remoteFs.open(tokenPath)
- try {
- val newCredentials = new Credentials()
- newCredentials.readTokenStorageStream(stream)
- newCredentials
- } finally {
- stream.close()
- }
- }
-
- def stop(): Unit = {
- delegationTokenRenewer.shutdown()
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 00194fba84..cfaebf9ea5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,16 +17,12 @@
package org.apache.spark.deploy
-import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
-import java.util.{Arrays, Comparator}
-import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -36,7 +32,6 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
-import scala.concurrent.duration._
/**
* :: DeveloperApi ::
@@ -44,8 +39,7 @@ import scala.concurrent.duration._
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
- private val sparkConf = new SparkConf()
- val conf: Configuration = newConfiguration(sparkConf)
+ val conf: Configuration = newConfiguration(new SparkConf())
UserGroupInformation.setConfiguration(conf)
/**
@@ -207,61 +201,6 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
- /**
- * Lists all the files in a directory with the specified prefix, and does not end with the
- * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
- * the respective files.
- */
- def listFilesSorted(
- remoteFs: FileSystem,
- dir: Path,
- prefix: String,
- exclusionSuffix: String): Array[FileStatus] = {
- val fileStatuses = remoteFs.listStatus(dir,
- new PathFilter {
- override def accept(path: Path): Boolean = {
- val name = path.getName
- name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
- }
- })
- Arrays.sort(fileStatuses, new Comparator[FileStatus] {
- override def compare(o1: FileStatus, o2: FileStatus): Int = {
- Longs.compare(o1.getModificationTime, o2.getModificationTime)
- }
- })
- fileStatuses
- }
-
- /**
- * How much time is remaining (in millis) from now to (fraction * renewal time for the token that
- * is valid the latest)?
- * This will return -ve (or 0) value if the fraction of validity has already expired.
- */
- def getTimeFromNowToRenewal(
- sparkConf: SparkConf,
- fraction: Double,
- credentials: Credentials): Long = {
- val now = System.currentTimeMillis()
-
- val renewalInterval =
- sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
-
- credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- .map { t =>
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- (identifier.getIssueDate + fraction * renewalInterval).toLong - now
- }.foldLeft(0L)(math.max)
- }
-
-
- private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
- val fileName = credentialsPath.getName
- fileName.substring(
- fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
- }
-
-
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
/**
@@ -312,10 +251,6 @@ object SparkHadoopUtil {
}
}
- val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
-
- val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
-
def get: SparkHadoopUtil = {
hadoop
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4b89ede427..0d149e703a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -401,10 +401,6 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
- // Yarn client or cluster
- OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
- OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
-
// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c0e4c77190..c621b8fc86 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -63,8 +63,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
- var principal: String = null
- var keytab: String = null
// Standalone cluster mode only
var supervise: Boolean = false
@@ -395,12 +393,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROXY_USER =>
proxyUser = value
- case PRINCIPAL =>
- principal = value
-
- case KEYTAB =>
- keytab = value
-
case HELP =>
printUsageAndExit(0)
@@ -514,13 +506,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
- | --principal PRINCIPAL Principal to be used to login to KDC, while running on
- | secure HDFS.
- | --keytab KEYTAB The full path to the file that contains the keytab for the
- | principal specified above. This keytab will be copied to
- | the node running the Application Master via the Secure
- | Distributed Cache, for renewing the login tickets and the
- | delegation tokens periodically.
""".stripMargin
)
SparkSubmit.exitFn()
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9e0f06ab76..79aed90b53 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
import org.apache.spark.rpc._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -168,16 +168,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
- var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
- if (driverConf.contains("spark.yarn.credentials.file")) {
- logInfo("Will periodically update credentials from: " +
- driverConf.get("spark.yarn.credentials.file"))
- // Periodically update the credentials for this user to ensure HDFS tokens get updated.
- tokenUpdaterOption =
- Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))
- tokenUpdaterOption.get.updateCredentialsIfRequired()
- }
-
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
@@ -193,7 +183,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
- tokenUpdaterOption.foreach(_.stop())
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f107148f3b..7352fa1fe9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -68,7 +68,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
-
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -113,7 +112,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
-
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -124,6 +122,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
+
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@@ -244,7 +243,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
properties += ((key, value))
}
}
-
// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))