diff options
author | Thomas Graves <tgraves@apache.org> | 2014-03-06 18:27:50 -0600 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-03-06 18:27:50 -0600 |
commit | 7edbea41b43e0dc11a2de156be220db8b7952d01 (patch) | |
tree | 1e7156df70131660d28994795eb3df2c3fd0a819 /yarn | |
parent | 40566e10aae4b21ffc71ea72702b8df118ac5c8e (diff) | |
download | spark-7edbea41b43e0dc11a2de156be220db8b7952d01.tar.gz spark-7edbea41b43e0dc11a2de156be220db8b7952d01.tar.bz2 spark-7edbea41b43e0dc11a2de156be220db8b7952d01.zip |
SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI use servlets
resubmit pull request. was https://github.com/apache/incubator-spark/pull/332.
Author: Thomas Graves <tgraves@apache.org>
Closes #33 from tgravescs/security-branch-0.9-with-client-rebase and squashes the following commits:
dfe3918 [Thomas Graves] Fix merge conflict since startUserClass now using runAsUser
05eebed [Thomas Graves] Fix dependency lost in upmerge
d1040ec [Thomas Graves] Fix up various imports
05ff5e0 [Thomas Graves] Fix up imports after upmerging to master
ac046b3 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase
13733e1 [Thomas Graves] Pass securityManager and SparkConf around where we can. Switch to use sparkConf for reading config whereever possible. Added ConnectionManagerSuite unit tests.
4a57acc [Thomas Graves] Change UI createHandler routines to createServlet since they now return servlets
2f77147 [Thomas Graves] Rework from comments
50dd9f2 [Thomas Graves] fix header in SecurityManager
ecbfb65 [Thomas Graves] Fix spacing and formatting
b514bec [Thomas Graves] Fix reference to config
ed3d1c1 [Thomas Graves] Add security.md
6f7ddf3 [Thomas Graves] Convert SaslClient and SaslServer to scala, change spark.authenticate.ui to spark.ui.acls.enable, and fix up various other things from review comments
2d9e23e [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase_rework
5721c5a [Thomas Graves] update AkkaUtilsSuite test for the actorSelection changes, fix typos based on comments, and remove extra lines I missed in rebase from AkkaUtils
f351763 [Thomas Graves] Add Security to Spark - Akka, Http, ConnectionManager, UI to use servlets
Diffstat (limited to 'yarn')
6 files changed, 77 insertions, 33 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e045b9f024..bb574f4152 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -27,7 +27,6 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ @@ -36,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -87,27 +86,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() - // Workaround until hadoop moves to something which has - // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) - // ignore result. - // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times - // Hence args.workerCores = numCore disabled above. Any better option? - - // Compute number of threads for akka - //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - //if (minimumMemory > 0) { - // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - - // if (numCore > 0) { - // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 - // TODO: Uncomment when hadoop is on a version which has this fixed. - // args.workerCores = numCore - // } - //} - // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) + // setup AmIpFilter for the SparkUI - do this before we start the UI + addAmIpFilter() ApplicationMaster.register(this) + + // Call this to force generation of secret so it gets populated into the + // hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the worker containers. + val securityMgr = new SecurityManager(sparkConf) + // Start the user's JAR userThread = startUserClass() @@ -132,6 +120,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, System.exit(0) } + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + System.setProperty("spark.ui.filters", amFilter) + val proxy = YarnConfiguration.getProxyHostAndPort(conf) + val parts : Array[String] = proxy.split(":") + val uriBase = "http://" + proxy + + System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + + val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", + params) + } + /** Get the Yarn approved local directories. */ private def getLocalDirs(): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 138c27910b..b735d01df8 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -50,8 +50,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private var yarnAllocator: YarnAllocationHandler = _ private var driverClosed:Boolean = false + val securityManager = new SecurityManager(sparkConf) val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf)._1 + conf = sparkConf, securityManager = securityManager)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -110,6 +111,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fe37168e5a..11322b1202 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -134,7 +134,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + + " --worker-cores NUM Number of cores for the workers (Default: 1).\n" + " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index d6c12a9f59..4c6e1dcd6d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.yarn -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration +import org.apache.spark.deploy.SparkHadoopUtil /** * Contains util methods to interact with Hadoop from spark. @@ -44,4 +46,24 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val jobCreds = conf.getCredentials() jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } + + override def getCurrentUserCredentials(): Credentials = { + UserGroupInformation.getCurrentUser().getCredentials() + } + + override def addCurrentUserCredentials(creds: Credentials) { + UserGroupInformation.getCurrentUser().addCredentials(creds) + } + + override def addSecretKeyToUserCredentials(key: String, secret: String) { + val creds = new Credentials() + creds.addSecretKey(new Text(key), secret.getBytes()) + addCurrentUserCredentials(creds) + } + + override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { + val credentials = getCurrentUserCredentials() + if (credentials != null) credentials.getSecretKey(new Text(key)) else null + } + } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dd117d5810..b48a2d50db 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -27,7 +27,6 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -37,8 +36,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -91,12 +91,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, amClient.init(yarnConf) amClient.start() - // Workaround until hadoop moves to something which has - // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) - // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) + // setup AmIpFilter for the SparkUI - do this before we start the UI + addAmIpFilter() ApplicationMaster.register(this) + // Call this to force generation of secret so it gets populated into the + // hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the worker containers. + val securityMgr = new SecurityManager(sparkConf) + // Start the user's JAR userThread = startUserClass() @@ -121,6 +125,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, System.exit(0) } + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + System.setProperty("spark.ui.filters", amFilter) + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts : Array[String] = proxy.split(":") + val uriBase = "http://" + proxy + + System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + + val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) + } + /** Get the Yarn approved local directories. */ private def getLocalDirs(): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the @@ -261,7 +278,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) - // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 40600f38e5..f1c1fea0b5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -52,8 +52,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private var amClient: AMRMClient[ContainerRequest] = _ + val securityManager = new SecurityManager(sparkConf) val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf)._1 + conf = sparkConf, securityManager = securityManager)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -105,6 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val interval = math.min(timeoutInterval / 2, schedulerInterval) reporterThread = launchReporterThread(interval) + // Wait for the reporter thread to Finish. reporterThread.join() |