aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala14
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala10
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala7
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala7
8 files changed, 69 insertions, 46 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 498fcc520a..e2df1b8954 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkException}
import scala.collection.JavaConversions._
/**
* Contains util methods to interact with Hadoop from Spark.
*/
-class SparkHadoopUtil {
+class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)
- def runAsUser(user: String)(func: () => Unit) {
+ /**
+ * Runs the given function with a Hadoop UserGroupInformation as a thread local variable
+ * (distributed to child threads), used for authenticating HDFS and YARN calls.
+ *
+ * IMPORTANT NOTE: If this function is going to be called repeated in the same process
+ * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
+ * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
+ */
+ def runAsSparkUser(func: () => Unit) {
+ val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
+ logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
+ logDebug("running as SPARK_UNKNOWN_USER")
func()
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9ac7365f47..e912ae8a5d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -94,25 +95,30 @@ private[spark] class CoarseGrainedExecutorBackend(
private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
- workerUrl: Option[String]) {
- // Debug code
- Utils.checkHost(hostname)
-
- val conf = new SparkConf
- // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
- // before getting started with all our system properties, etc
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- indestructible = true, conf = conf, new SecurityManager(conf))
- // set it
- val sparkHostPort = hostname + ":" + boundPort
- actorSystem.actorOf(
- Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
- sparkHostPort, cores),
- name = "Executor")
- workerUrl.foreach{ url =>
- actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
+ workerUrl: Option[String]) {
+
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ // Debug code
+ Utils.checkHost(hostname)
+
+ val conf = new SparkConf
+ // Create a new ActorSystem to run the backend, because we can't create a
+ // SparkEnv / Executor before getting started with all our system properties, etc
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+ indestructible = true, conf = conf, new SecurityManager(conf))
+ // set it
+ val sparkHostPort = hostname + ":" + boundPort
+ actorSystem.actorOf(
+ Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
+ sparkHostPort, cores),
+ name = "Executor")
+ workerUrl.foreach {
+ url =>
+ actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
+ }
+ actorSystem.awaitTermination()
+
}
- actorSystem.awaitTermination()
}
def main(args: Array[String]) {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 272bcda5f8..98e7e0be81 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -128,8 +128,6 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
- val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
-
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
@@ -172,7 +170,7 @@ private[spark] class Executor(
}
}
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+ override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 64e24506e8..9b56f711e0 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
-import org.apache.spark.Logging
-import org.apache.spark.TaskState
+import org.apache.spark.{Logging, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkHadoopUtil
private[spark] class MesosExecutorBackend
extends MesosExecutor
@@ -95,9 +95,11 @@ private[spark] class MesosExecutorBackend
*/
private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
- MesosNativeLibrary.load()
- // Create a new Executor and start it running
- val runner = new MesosExecutorBackend()
- new MesosExecutorDriver(runner).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ MesosNativeLibrary.load()
+ // Create a new Executor and start it running
+ val runner = new MesosExecutorBackend()
+ new MesosExecutorDriver(runner).run()
+ }
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index fc13dbecb4..8f0ecb8557 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false
- private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
- SparkContext.SPARK_UNKNOWN_USER)
-
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
@@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+ override def run() {
+
var successed = false
try {
// Copy
@@ -480,6 +478,8 @@ object ApplicationMaster {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ApplicationMaster(args).run()
+ }
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 65b7215afb..a3bd91590f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ExecutorLauncher(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ExecutorLauncher(args).run()
+ }
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 90e807160d..c1dfe3f53b 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
-
- private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
- SparkContext.SPARK_UNKNOWN_USER)
def run() {
// Setup the directories so things go to YARN approved directories rather
@@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
- var successed = false
+ override def run() {
+
+ var successed = false
try {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -462,6 +460,8 @@ object ApplicationMaster {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ApplicationMaster(args).run()
+ }
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a14bb377aa..a4ce8766d3 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
- new ExecutorLauncher(args).run()
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ new ExecutorLauncher(args).run()
+ }
}
}