aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-03-06 18:27:50 -0600
committerThomas Graves <tgraves@apache.org>2014-03-06 18:27:50 -0600
commit7edbea41b43e0dc11a2de156be220db8b7952d01 (patch)
tree1e7156df70131660d28994795eb3df2c3fd0a819 /yarn
parent40566e10aae4b21ffc71ea72702b8df118ac5c8e (diff)
downloadspark-7edbea41b43e0dc11a2de156be220db8b7952d01.tar.gz
spark-7edbea41b43e0dc11a2de156be220db8b7952d01.tar.bz2
spark-7edbea41b43e0dc11a2de156be220db8b7952d01.zip
SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI use servlets
resubmit pull request. was https://github.com/apache/incubator-spark/pull/332. Author: Thomas Graves <tgraves@apache.org> Closes #33 from tgravescs/security-branch-0.9-with-client-rebase and squashes the following commits: dfe3918 [Thomas Graves] Fix merge conflict since startUserClass now using runAsUser 05eebed [Thomas Graves] Fix dependency lost in upmerge d1040ec [Thomas Graves] Fix up various imports 05ff5e0 [Thomas Graves] Fix up imports after upmerging to master ac046b3 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase 13733e1 [Thomas Graves] Pass securityManager and SparkConf around where we can. Switch to use sparkConf for reading config whereever possible. Added ConnectionManagerSuite unit tests. 4a57acc [Thomas Graves] Change UI createHandler routines to createServlet since they now return servlets 2f77147 [Thomas Graves] Rework from comments 50dd9f2 [Thomas Graves] fix header in SecurityManager ecbfb65 [Thomas Graves] Fix spacing and formatting b514bec [Thomas Graves] Fix reference to config ed3d1c1 [Thomas Graves] Add security.md 6f7ddf3 [Thomas Graves] Convert SaslClient and SaslServer to scala, change spark.authenticate.ui to spark.ui.acls.enable, and fix up various other things from review comments 2d9e23e [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase_rework 5721c5a [Thomas Graves] update AkkaUtilsSuite test for the actorSelection changes, fix typos based on comments, and remove extra lines I missed in rebase from AkkaUtils f351763 [Thomas Graves] Add Security to Spark - Akka, Http, ConnectionManager, UI to use servlets
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala44
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala24
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala28
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala6
6 files changed, 77 insertions, 33 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e045b9f024..bb574f4152 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
@@ -36,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -87,27 +86,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // ignore result.
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option?
-
- // Compute number of threads for akka
- //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- //if (minimumMemory > 0) {
- // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- // if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- // }
- //}
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ // setup AmIpFilter for the SparkUI - do this before we start the UI
+ addAmIpFilter()
ApplicationMaster.register(this)
+
+ // Call this to force generation of secret so it gets populated into the
+ // hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the worker containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
// Start the user's JAR
userThread = startUserClass()
@@ -132,6 +120,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.exit(0)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ System.setProperty("spark.ui.filters", amFilter)
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts : Array[String] = proxy.split(":")
+ val uriBase = "http://" + proxy +
+ System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
+ params)
+ }
+
/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 138c27910b..b735d01df8 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
@@ -50,8 +50,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
+ val securityManager = new SecurityManager(sparkConf)
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
+ conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
@@ -110,6 +111,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index fe37168e5a..11322b1202 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -134,7 +134,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+ " --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
" --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d6c12a9f59..4c6e1dcd6d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,11 +17,13 @@
package org.apache.spark.deploy.yarn
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* Contains util methods to interact with Hadoop from spark.
@@ -44,4 +46,24 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
+
+ override def getCurrentUserCredentials(): Credentials = {
+ UserGroupInformation.getCurrentUser().getCredentials()
+ }
+
+ override def addCurrentUserCredentials(creds: Credentials) {
+ UserGroupInformation.getCurrentUser().addCredentials(creds)
+ }
+
+ override def addSecretKeyToUserCredentials(key: String, secret: String) {
+ val creds = new Credentials()
+ creds.addSecretKey(new Text(key), secret.getBytes())
+ addCurrentUserCredentials(creds)
+ }
+
+ override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+ val credentials = getCurrentUserCredentials()
+ if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+ }
+
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index dd117d5810..b48a2d50db 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -37,8 +36,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -91,12 +91,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
amClient.init(yarnConf)
amClient.start()
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ // setup AmIpFilter for the SparkUI - do this before we start the UI
+ addAmIpFilter()
ApplicationMaster.register(this)
+ // Call this to force generation of secret so it gets populated into the
+ // hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the worker containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
// Start the user's JAR
userThread = startUserClass()
@@ -121,6 +125,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.exit(0)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ System.setProperty("spark.ui.filters", amFilter)
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts : Array[String] = proxy.split(":")
+ val uriBase = "http://" + proxy +
+ System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+ }
+
/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
@@ -261,7 +278,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val schedulerInterval =
sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 40600f38e5..f1c1fea0b5 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
@@ -52,8 +52,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var amClient: AMRMClient[ContainerRequest] = _
+ val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
+ conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
@@ -105,6 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val interval = math.min(timeoutInterval / 2, schedulerInterval)
reporterThread = launchReporterThread(interval)
+
// Wait for the reporter thread to Finish.
reporterThread.join()