aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-02-28 12:43:01 -0600
committerThomas Graves <tgraves@apache.org>2014-02-28 12:43:01 -0600
commit46dff34458096e5330073ca58e0723da52aeddcd (patch)
tree77b7d1c1c7798bd9089538f4bee90b7c9257c253 /core
parent5f419bf9f433e8f057237f1d5bfed9f5f4e9427c (diff)
downloadspark-46dff34458096e5330073ca58e0723da52aeddcd.tar.gz
spark-46dff34458096e5330073ca58e0723da52aeddcd.tar.bz2
spark-46dff34458096e5330073ca58e0723da52aeddcd.zip
SPARK-1051. On YARN, executors don't doAs submitting user
This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo Author: Sandy Ryza <sandy@cloudera.com> Closes #29 from sryza/sandy-spark-1051 and squashes the following commits: 708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index b479225b45..ec15647e1d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkContext, SparkException}
+import scala.collection.JavaConversions._
+
/**
* Contains util methods to interact with Hadoop from Spark.
*/
@@ -33,15 +35,9 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {
- // if we are already running as the user intended there is no reason to do the doAs. It
- // will actually break secure HDFS access as it doesn't fill in the credentials. Also if
- // the user is UNKNOWN then we shouldn't be creating a remote unknown user
- // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
- // in SparkContext.
- val currentUser = Option(System.getProperty("user.name")).
- getOrElse(SparkContext.SPARK_UNKNOWN_USER)
- if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
+ if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
+ transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
@@ -50,6 +46,12 @@ class SparkHadoopUtil {
}
}
+ def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+ for (token <- source.getTokens()) {
+ dest.addToken(token)
+ }
+ }
+
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.