diff options
-rwxr-xr-x | bin/utils.sh | 3 | ||||
-rw-r--r-- | bin/windows-utils.cmd | 1 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/SecurityManager.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 19 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 56 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 11 |
8 files changed, 82 insertions, 34 deletions
diff --git a/bin/utils.sh b/bin/utils.sh index 2241200082..748dbe345a 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() { --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \ --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) + --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | \ + --proxy-user) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" exit 1; diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd index 567b8733f7..0cf9e87ca5 100644 --- a/bin/windows-utils.cmd +++ b/bin/windows-utils.cmd @@ -33,6 +33,7 @@ SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<-- SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>" SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>" SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>" +SET opts="%opts:~1,-1% \<--proxy-user\>" echo %1 | findstr %opts% >nul if %ERRORLEVEL% equ 0 ( diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 88d35a4bac..3653f724ba 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.network.sasl.SecretKeyHolder +import org.apache.spark.util.Utils /** * Spark class responsible for security. @@ -203,7 +204,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // always add the current user and SPARK_USER to the viewAcls private val defaultAclUsers = Set[String](System.getProperty("user.name", ""), - Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty) + Utils.getCurrentUserName()) setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", "")) setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", "")) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 04ca5d1019..53fce6b0de 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -191,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // log out Spark Version in Spark driver log logInfo(s"Running Spark version $SPARK_VERSION") - + private[spark] val conf = config.clone() conf.validateSettings() @@ -335,11 +335,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli executorEnvs ++= conf.getExecutorEnv // Set SPARK_USER for user who is running SparkContext. - val sparkUser = Option { - Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name")) - }.getOrElse { - SparkContext.SPARK_UNKNOWN_USER - } + val sparkUser = Utils.getCurrentUserName() executorEnvs("SPARK_USER") = sparkUser // Create and start the scheduler @@ -826,7 +822,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { assertNotStopped() - // The call to new NewHadoopJob automatically adds security credentials to conf, + // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) @@ -1626,8 +1622,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @deprecated("use defaultMinPartitions", "1.0.0") def defaultMinSplits: Int = math.min(defaultParallelism, 2) - /** - * Default min number of partitions for Hadoop RDDs when not given by user + /** + * Default min number of partitions for Hadoop RDDs when not given by user * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. * The reasons for this are discussed in https://github.com/mesos/spark/pull/718 */ @@ -1844,8 +1840,6 @@ object SparkContext extends Logging { private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" - private[spark] val SPARK_UNKNOWN_USER = "<unknown>" - private[spark] val DRIVER_IDENTIFIER = "<driver>" // The following deprecated objects have already been copied to `object AccumulatorParam` to diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 03238e9fa0..e0a32fb65c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -52,18 +52,13 @@ class SparkHadoopUtil extends Logging { * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems */ def runAsSparkUser(func: () => Unit) { - val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) - if (user != SparkContext.SPARK_UNKNOWN_USER) { - logDebug("running as user: " + user) - val ugi = UserGroupInformation.createRemoteUser(user) - transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) - } else { - logDebug("running as SPARK_UNKNOWN_USER") - func() - } + val user = Utils.getCurrentUserName() + logDebug("running as user: " + user) + val ugi = UserGroupInformation.createRemoteUser(user) + transferCredentials(UserGroupInformation.getCurrentUser(), ugi) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c4bc5054d6..80cc058728 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -18,12 +18,14 @@ package org.apache.spark.deploy import java.io.{File, PrintStream} -import java.lang.reflect.{InvocationTargetException, Modifier} +import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL +import java.security.PrivilegedExceptionAction import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import org.apache.hadoop.fs.Path +import org.apache.hadoop.security.UserGroupInformation import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor._ @@ -79,7 +81,7 @@ object SparkSubmit { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 // Exposed for testing - private[spark] var exitFn: () => Unit = () => System.exit(-1) + private[spark] var exitFn: () => Unit = () => System.exit(1) private[spark] var printStream: PrintStream = System.err private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) private[spark] def printErrorAndExit(str: String) = { @@ -126,6 +128,34 @@ object SparkSubmit { */ private[spark] def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) + + def doRunMain(): Unit = { + if (args.proxyUser != null) { + val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, + UserGroupInformation.getCurrentUser()) + try { + proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) + } + }) + } catch { + case e: Exception => + // Hadoop's AuthorizationException suppresses the exception's stack trace, which + // makes the message printed to the output by the JVM not very helpful. Instead, + // detect exceptions with empty stack traces here, and treat them differently. + if (e.getStackTrace().length == 0) { + printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") + exitFn() + } else { + throw e + } + } + } else { + runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) + } + } + // In standalone cluster mode, there are two submission gateways: // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 @@ -134,7 +164,7 @@ object SparkSubmit { if (args.isStandaloneCluster && args.useRest) { try { printStream.println("Running Spark using the REST application submission protocol.") - runMain(childArgs, childClasspath, sysProps, childMainClass) + doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => @@ -145,7 +175,7 @@ object SparkSubmit { } // In all other modes, just run the main class as prepared } else { - runMain(childArgs, childClasspath, sysProps, childMainClass) + doRunMain() } } @@ -457,7 +487,7 @@ object SparkSubmit { childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, - verbose: Boolean = false) { + verbose: Boolean): Unit = { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") @@ -507,13 +537,21 @@ object SparkSubmit { if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } + + def findCause(t: Throwable): Throwable = t match { + case e: UndeclaredThrowableException => + if (e.getCause() != null) findCause(e.getCause()) else e + case e: InvocationTargetException => + if (e.getCause() != null) findCause(e.getCause()) else e + case e: Throwable => + e + } + try { mainMethod.invoke(null, childArgs.toArray) } catch { - case e: InvocationTargetException => e.getCause match { - case cause: Throwable => throw cause - case null => throw e - } + case t: Throwable => + throw findCause(t) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index bd0ae26fd8..fa38070c6f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -57,6 +57,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var pyFiles: String = null var action: SparkSubmitAction = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() + var proxyUser: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -405,6 +406,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St } parse(tail) + case ("--proxy-user") :: value :: tail => + proxyUser = value + parse(tail) + case ("--help" | "-h") :: tail => printUsageAndExit(0) @@ -476,6 +481,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). | + | --proxy-user NAME User to impersonate when submitting the application. + | | --help, -h Show this help message and exit | --verbose, -v Print additional debug output | diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 61d287ca9c..6af8dd555f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ @@ -1986,6 +1987,16 @@ private[spark] object Utils extends Logging { throw new SparkException("Invalid master URL: " + sparkUrl, e) } } + + /** + * Returns the current user name. This is the currently logged in user, unless that's been + * overridden by the `SPARK_USER` environment variable. + */ + def getCurrentUserName(): String = { + Option(System.getenv("SPARK_USER")) + .getOrElse(UserGroupInformation.getCurrentUser().getUserName()) + } + } /** |