aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-05-03 10:59:05 -0700
committerAaron Davidson <aaron@databricks.com>2014-05-03 10:59:05 -0700
commit3d0a02dff3011e8894d98d903cd086bc95e56807 (patch)
tree90558a3fafb9761aea1f96ff84bd4544ec36fad6 /yarn/stable
parent9347565f4188cf1574c6dc49fcde91eb286be955 (diff)
downloadspark-3d0a02dff3011e8894d98d903cd086bc95e56807.tar.gz
spark-3d0a02dff3011e8894d98d903cd086bc95e56807.tar.bz2
spark-3d0a02dff3011e8894d98d903cd086bc95e56807.zip
[WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak
Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems. Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user. Note this hasn't been fully tested yet. Need to test in standalone mode. Putting this up for people to look at and possibly test. I don't have access to a mesos cluster. This is alternative to https://github.com/apache/spark/pull/607 Author: Thomas Graves <tgraves@apache.org> Closes #621 from tgravescs/SPARK-1676 and squashes the following commits: 244d55a [Thomas Graves] fix line length 44163d4 [Thomas Graves] Rework 9398853 [Thomas Graves] change to have doAs in executor higher up.
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala7
2 files changed, 11 insertions, 8 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 90e807160d..c1dfe3f53b 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
-
- private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
- SparkContext.SPARK_UNKNOWN_USER)
def run() {
// Setup the directories so things go to YARN approved directories rather
@@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
- var successed = false
+ override def run() {
+
+ var successed = false
try {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -462,6 +460,8 @@ object ApplicationMaster {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ApplicationMaster(args).run()
+ }
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a14bb377aa..a4ce8766d3 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ExecutorLauncher(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ExecutorLauncher(args).run()
+ }
}
}