diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 31 |
1 files changed, 27 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6bc846aa92..fc1537f796 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,16 +17,39 @@ package org.apache.spark.deploy +import java.security.PrivilegedExceptionAction + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.SparkException +import org.apache.spark.{SparkContext, SparkException} /** * Contains util methods to interact with Hadoop from Spark. */ private[spark] class SparkHadoopUtil { + val conf = newConfiguration() + UserGroupInformation.setConfiguration(conf) + + def runAsUser(user: String)(func: () => Unit) { + // if we are already running as the user intended there is no reason to do the doAs. It + // will actually break secure HDFS access as it doesn't fill in the credentials. Also if + // the user is UNKNOWN then we shouldn't be creating a remote unknown user + // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only + // in SparkContext. + val currentUser = Option(System.getProperty("user.name")). + getOrElse(SparkContext.SPARK_UNKNOWN_USER) + if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) { + val ugi = UserGroupInformation.createRemoteUser(user) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } else { + func() + } + } /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop @@ -42,9 +65,9 @@ class SparkHadoopUtil { def isYarnMode(): Boolean = { false } } - + object SparkHadoopUtil { - private val hadoop = { + private val hadoop = { val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) if (yarnMode) { try { @@ -56,7 +79,7 @@ object SparkHadoopUtil { new SparkHadoopUtil } } - + def get: SparkHadoopUtil = { hadoop } |