diff options
Diffstat (limited to 'yarn')
4 files changed, 21 insertions, 15 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc13dbecb4..8f0ecb8557 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var registered = false - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. @@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { + var successed = false try { // Copy @@ -480,6 +478,8 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ApplicationMaster(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ApplicationMaster(args).run() + } } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 65b7215afb..a3bd91590f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.deploy.SparkHadoopUtil /** * An application master that allocates executors on behalf of a driver that is running outside @@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ExecutorLauncher(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ExecutorLauncher(args).run() + } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 90e807160d..c1dfe3f53b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false - - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) def run() { // Setup the directories so things go to YARN approved directories rather @@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => - var successed = false + override def run() { + + var successed = false try { // Copy var mainArgs: Array[String] = new Array[String](args.userArgs.size) @@ -462,6 +460,8 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ApplicationMaster(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ApplicationMaster(args).run() + } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a14bb377aa..a4ce8766d3 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.spark.deploy.SparkHadoopUtil /** * An application master that allocates executors on behalf of a driver that is running outside @@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ExecutorLauncher(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ExecutorLauncher(args).run() + } } } |