diff options
author | Thomas Graves <tgraves@apache.org> | 2014-09-05 09:54:40 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-09-05 09:54:40 -0500 |
commit | 51b53a758c85f2e20ad9bd73ed815fcfa9c7180b (patch) | |
tree | fd977454224b99d90f8b3148ce94ced298568230 /yarn/common/src/main/scala | |
parent | 6a37ed838b3cbf96f7a904f3d3dabf99141729f5 (diff) | |
download | spark-51b53a758c85f2e20ad9bd73ed815fcfa9c7180b.tar.gz spark-51b53a758c85f2e20ad9bd73ed815fcfa9c7180b.tar.bz2 spark-51b53a758c85f2e20ad9bd73ed815fcfa9c7180b.zip |
[SPARK-3260] yarn - pass acls along with executor launch
Pass along the acl settings when we launch a container so that they can be applied to viewing the logs on a running NodeManager.
Author: Thomas Graves <tgraves@apache.org>
Closes #2185 from tgravescs/SPARK-3260 and squashes the following commits:
6f94b5a [Thomas Graves] make unit test more robust
28b9dd3 [Thomas Graves] yarn - pass acls along with executor launch
Diffstat (limited to 'yarn/common/src/main/scala')
5 files changed, 28 insertions, 17 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 98039a20de..a879c833a0 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val securityMgr = new SecurityManager(sparkConf) if (isDriver) { - runDriver() + runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) } @@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.compareAndSet(sc, null) } - private def registerAM(uiAddress: String) = { + private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { val sc = sparkContextRef.get() val appId = client.getAttemptId().getApplicationId().toString() @@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc != null) sc.getConf else sparkConf, if (sc != null) sc.preferredNodeLocationData else Map(), uiAddress, - historyAddress) + historyAddress, + securityMgr) allocator.allocateResources() reporterThread = launchReporterThread() } - private def runDriver(): Unit = { + private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() val userThread = startUserClass() @@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc == null) { finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") } else { - registerAM(sc.ui.appUIHostPort) + registerAM(sc.ui.appUIHostPort, securityMgr) try { userThread.join() } finally { @@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, conf = sparkConf, securityManager = securityMgr)._1 actor = waitForSparkDriver() addAmIpFilter() - registerAM(sparkConf.get("spark.driver.appUIAddress", "")) + registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) // In client mode the actor will stop the reporter thread. reporterThread.join() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5d8e5e6dff..8075b7a7fb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -430,10 +430,8 @@ trait ClientBase extends Logging { // send the acl settings into YARN to control who has access via YARN interfaces val securityManager = new SecurityManager(sparkConf) - val acls = Map[ApplicationAccessType, String] ( - ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls, - ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls) - amContainer.setApplicationACLs(acls) + amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) + amContainer } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index c74dd1c2b2..02b9a81bf6 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.spark.{Logging, SparkConf, SparkEnv} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator( conf: Configuration, sparkConf: SparkConf, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures @@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator( executorId, executorHostname, executorMemory, - executorCores) + executorCores, + securityMgr) new Thread(executorRunnable).start() } } @@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator( } -}
\ No newline at end of file +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 922d7d1a85..ed65e56b3e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -22,7 +22,7 @@ import scala.collection.{Map, Set} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler.SplitInfo /** @@ -45,7 +45,8 @@ trait YarnRMClient { sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String): YarnAllocator + uiHistoryAddress: String, + securityMgr: SecurityManager): YarnAllocator /** * Shuts down the AM. Guaranteed to only be called once. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index dc77f12364..4a33e34c3b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -211,4 +212,12 @@ object YarnSparkHadoopUtil { } } + private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager): + Map[ApplicationAccessType, String] = { + Map[ApplicationAccessType, String] ( + ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, + ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls + ) + } + } |