aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-02-28 12:43:01 -0600
committerThomas Graves <tgraves@apache.org>2014-02-28 12:43:01 -0600
commit46dff34458096e5330073ca58e0723da52aeddcd (patch)
tree77b7d1c1c7798bd9089538f4bee90b7c9257c253 /yarn
parent5f419bf9f433e8f057237f1d5bfed9f5f4e9427c (diff)
downloadspark-46dff34458096e5330073ca58e0723da52aeddcd.tar.gz
spark-46dff34458096e5330073ca58e0723da52aeddcd.tar.bz2
spark-46dff34458096e5330073ca58e0723da52aeddcd.zip
SPARK-1051. On YARN, executors don't doAs submitting user
This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo Author: Sandy Ryza <sandy@cloudera.com> Closes #29 from sryza/sandy-spark-1051 and squashes the following commits: 708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala1
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala6
4 files changed, 15 insertions, 2 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 560e5de358..e045b9f024 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
@@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false
+ private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
+ SparkContext.SPARK_UNKNOWN_USER)
+
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
@@ -180,7 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run() {
+ override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 24520bd21b..4b6c7db836 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -272,6 +272,7 @@ trait ClientBase extends Logging {
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
+ env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
// Set the environment variables to be passed on to the Workers.
distCacheMgr.setDistFilesEnv(env)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2ba2366ead..d6c12a9f59 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
+ override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+ dest.addCredentials(source.getCredentials())
+ }
+
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
override def isYarnMode(): Boolean = { true }
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0f58c49c69..dd117d5810 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -68,6 +69,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
math.max(args.numWorkers * 2, 3))
private var registered = false
+
+ private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
+ SparkContext.SPARK_UNKNOWN_USER)
def run() {
// Setup the directories so things go to YARN approved directories rather
@@ -152,7 +156,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run() {
+ override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy