aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala10
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala7
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala7
4 files changed, 21 insertions, 15 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index fc13dbecb4..8f0ecb8557 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false
- private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
- SparkContext.SPARK_UNKNOWN_USER)
-
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
@@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+ override def run() {
+
var successed = false
try {
// Copy
@@ -480,6 +478,8 @@ object ApplicationMaster {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ApplicationMaster(args).run()
+ }
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 65b7215afb..a3bd91590f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ExecutorLauncher(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ExecutorLauncher(args).run()
+ }
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 90e807160d..c1dfe3f53b 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
-
- private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
- SparkContext.SPARK_UNKNOWN_USER)
def run() {
// Setup the directories so things go to YARN approved directories rather
@@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
- var successed = false
+ override def run() {
+
+ var successed = false
try {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -462,6 +460,8 @@ object ApplicationMaster {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ApplicationMaster(args).run()
+ }
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a14bb377aa..a4ce8766d3 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ExecutorLauncher(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ExecutorLauncher(args).run()
+ }
}
}