aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-02-10 17:19:10 -0800
committerPatrick Wendell <patrick@databricks.com>2015-02-10 17:19:10 -0800
commited167e70c6d355f39b366ea0d3b92dd26d826a0b (patch)
tree08d8fac6279a161d9d83d5aea334215094c2c2c7
parente28b6bdbb5c5e4fd62ec0b547b77719c3f7e476e (diff)
downloadspark-ed167e70c6d355f39b366ea0d3b92dd26d826a0b.tar.gz
spark-ed167e70c6d355f39b366ea0d3b92dd26d826a0b.tar.bz2
spark-ed167e70c6d355f39b366ea0d3b92dd26d826a0b.zip
[SPARK-5493] [core] Add option to impersonate user.
Hadoop has a feature that allows users to impersonate other users when submitting applications or talking to HDFS, for example. These impersonated users are referred generally as "proxy users". Services such as Oozie or Hive use this feature to run applications as the requesting user. This change makes SparkSubmit accept a new command line option to run the application as a proxy user. It also fixes the plumbing of the user name through the UI (and a couple of other places) to refer to the correct user running the application, which can be different than `sys.props("user.name")` even without proxies (e.g. when using kerberos). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #4405 from vanzin/SPARK-5493 and squashes the following commits: df82427 [Marcelo Vanzin] Clarify the reason for the special exception handling. 05bfc08 [Marcelo Vanzin] Remove unneeded annotation. 4840de9 [Marcelo Vanzin] Review feedback. 8af06ff [Marcelo Vanzin] Fix usage string. 2e4fa8f [Marcelo Vanzin] Merge branch 'master' into SPARK-5493 b6c947d [Marcelo Vanzin] Merge branch 'master' into SPARK-5493 0540d38 [Marcelo Vanzin] [SPARK-5493] [core] Add option to impersonate user.
-rwxr-xr-xbin/utils.sh3
-rw-r--r--bin/windows-utils.cmd1
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala56
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
8 files changed, 82 insertions, 34 deletions
diff --git a/bin/utils.sh b/bin/utils.sh
index 2241200082..748dbe345a 100755
--- a/bin/utils.sh
+++ b/bin/utils.sh
@@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() {
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
- --total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
+ --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | \
+ --proxy-user)
if [[ $# -lt 2 ]]; then
"$SUBMIT_USAGE_FUNCTION"
exit 1;
diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd
index 567b8733f7..0cf9e87ca5 100644
--- a/bin/windows-utils.cmd
+++ b/bin/windows-utils.cmd
@@ -33,6 +33,7 @@ SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
+SET opts="%opts:~1,-1% \<--proxy-user\>"
echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 88d35a4bac..3653f724ba 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Text
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.sasl.SecretKeyHolder
+import org.apache.spark.util.Utils
/**
* Spark class responsible for security.
@@ -203,7 +204,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// always add the current user and SPARK_USER to the viewAcls
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
- Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)
+ Utils.getCurrentUserName())
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 04ca5d1019..53fce6b0de 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -191,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
-
+
private[spark] val conf = config.clone()
conf.validateSettings()
@@ -335,11 +335,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
executorEnvs ++= conf.getExecutorEnv
// Set SPARK_USER for user who is running SparkContext.
- val sparkUser = Option {
- Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
- }.getOrElse {
- SparkContext.SPARK_UNKNOWN_USER
- }
+ val sparkUser = Utils.getCurrentUserName()
executorEnvs("SPARK_USER") = sparkUser
// Create and start the scheduler
@@ -826,7 +822,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
assertNotStopped()
- // The call to new NewHadoopJob automatically adds security credentials to conf,
+ // The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
@@ -1626,8 +1622,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
- /**
- * Default min number of partitions for Hadoop RDDs when not given by user
+ /**
+ * Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
@@ -1844,8 +1840,6 @@ object SparkContext extends Logging {
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
- private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
-
private[spark] val DRIVER_IDENTIFIER = "<driver>"
// The following deprecated objects have already been copied to `object AccumulatorParam` to
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 03238e9fa0..e0a32fb65c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -52,18 +52,13 @@ class SparkHadoopUtil extends Logging {
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
def runAsSparkUser(func: () => Unit) {
- val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
- if (user != SparkContext.SPARK_UNKNOWN_USER) {
- logDebug("running as user: " + user)
- val ugi = UserGroupInformation.createRemoteUser(user)
- transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
- ugi.doAs(new PrivilegedExceptionAction[Unit] {
- def run: Unit = func()
- })
- } else {
- logDebug("running as SPARK_UNKNOWN_USER")
- func()
- }
+ val user = Utils.getCurrentUserName()
+ logDebug("running as user: " + user)
+ val ugi = UserGroupInformation.createRemoteUser(user)
+ transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
+ ugi.doAs(new PrivilegedExceptionAction[Unit] {
+ def run: Unit = func()
+ })
}
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c4bc5054d6..80cc058728 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -18,12 +18,14 @@
package org.apache.spark.deploy
import java.io.{File, PrintStream}
-import java.lang.reflect.{InvocationTargetException, Modifier}
+import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
+import java.security.PrivilegedExceptionAction
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
@@ -79,7 +81,7 @@ object SparkSubmit {
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// Exposed for testing
- private[spark] var exitFn: () => Unit = () => System.exit(-1)
+ private[spark] var exitFn: () => Unit = () => System.exit(1)
private[spark] var printStream: PrintStream = System.err
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
private[spark] def printErrorAndExit(str: String) = {
@@ -126,6 +128,34 @@ object SparkSubmit {
*/
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
+
+ def doRunMain(): Unit = {
+ if (args.proxyUser != null) {
+ val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
+ UserGroupInformation.getCurrentUser())
+ try {
+ proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
+ }
+ })
+ } catch {
+ case e: Exception =>
+ // Hadoop's AuthorizationException suppresses the exception's stack trace, which
+ // makes the message printed to the output by the JVM not very helpful. Instead,
+ // detect exceptions with empty stack traces here, and treat them differently.
+ if (e.getStackTrace().length == 0) {
+ printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
+ exitFn()
+ } else {
+ throw e
+ }
+ }
+ } else {
+ runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
+ }
+ }
+
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
@@ -134,7 +164,7 @@ object SparkSubmit {
if (args.isStandaloneCluster && args.useRest) {
try {
printStream.println("Running Spark using the REST application submission protocol.")
- runMain(childArgs, childClasspath, sysProps, childMainClass)
+ doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
@@ -145,7 +175,7 @@ object SparkSubmit {
}
// In all other modes, just run the main class as prepared
} else {
- runMain(childArgs, childClasspath, sysProps, childMainClass)
+ doRunMain()
}
}
@@ -457,7 +487,7 @@ object SparkSubmit {
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
- verbose: Boolean = false) {
+ verbose: Boolean): Unit = {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
@@ -507,13 +537,21 @@ object SparkSubmit {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
+
+ def findCause(t: Throwable): Throwable = t match {
+ case e: UndeclaredThrowableException =>
+ if (e.getCause() != null) findCause(e.getCause()) else e
+ case e: InvocationTargetException =>
+ if (e.getCause() != null) findCause(e.getCause()) else e
+ case e: Throwable =>
+ e
+ }
+
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
- case e: InvocationTargetException => e.getCause match {
- case cause: Throwable => throw cause
- case null => throw e
- }
+ case t: Throwable =>
+ throw findCause(t)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index bd0ae26fd8..fa38070c6f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -57,6 +57,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var pyFiles: String = null
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
+ var proxyUser: String = null
// Standalone cluster mode only
var supervise: Boolean = false
@@ -405,6 +406,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
parse(tail)
+ case ("--proxy-user") :: value :: tail =>
+ proxyUser = value
+ parse(tail)
+
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)
@@ -476,6 +481,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
+ | --proxy-user NAME User to impersonate when submitting the application.
+ |
| --help, -h Show this help message and exit
| --verbose, -v Print additional debug output
|
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 61d287ca9c..6af8dd555f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
@@ -1986,6 +1987,16 @@ private[spark] object Utils extends Logging {
throw new SparkException("Invalid master URL: " + sparkUrl, e)
}
}
+
+ /**
+ * Returns the current user name. This is the currently logged in user, unless that's been
+ * overridden by the `SPARK_USER` environment variable.
+ */
+ def getCurrentUserName(): String = {
+ Option(System.getenv("SPARK_USER"))
+ .getOrElse(UserGroupInformation.getCurrentUser().getUserName())
+ }
+
}
/**