From 3d0a02dff3011e8894d98d903cd086bc95e56807 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Sat, 3 May 2014 10:59:05 -0700 Subject: [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems. Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user. Note this hasn't been fully tested yet. Need to test in standalone mode. Putting this up for people to look at and possibly test. I don't have access to a mesos cluster. This is alternative to https://github.com/apache/spark/pull/607 Author: Thomas Graves Closes #621 from tgravescs/SPARK-1676 and squashes the following commits: 244d55a [Thomas Graves] fix line length 44163d4 [Thomas Graves] Rework 9398853 [Thomas Graves] change to have doAs in executor higher up. --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 17 +++++++-- .../executor/CoarseGrainedExecutorBackend.scala | 44 ++++++++++++---------- .../scala/org/apache/spark/executor/Executor.scala | 4 +- .../spark/executor/MesosExecutorBackend.scala | 14 ++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 10 ++--- .../spark/deploy/yarn/ExecutorLauncher.scala | 7 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 12 +++--- .../spark/deploy/yarn/ExecutorLauncher.scala | 7 +++- 8 files changed, 69 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 498fcc520a..e2df1b8954 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import scala.collection.JavaConversions._ /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () => Unit) { + /** + * Runs the given function with a Hadoop UserGroupInformation as a thread local variable + * (distributed to child threads), used for authenticating HDFS and YARN calls. + * + * IMPORTANT NOTE: If this function is going to be called repeated in the same process + * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly + * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems + */ + def runAsSparkUser(func: () => Unit) { + val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { + logDebug("running as user: " + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } else { + logDebug("running as SPARK_UNKNOWN_USER") func() } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9ac7365f47..e912ae8a5d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,8 +22,9 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{SecurityManager, SparkConf, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -94,25 +95,30 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int, - workerUrl: Option[String]) { - // Debug code - Utils.checkHost(hostname) - - val conf = new SparkConf - // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor - // before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - indestructible = true, conf = conf, new SecurityManager(conf)) - // set it - val sparkHostPort = hostname + ":" + boundPort - actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, - sparkHostPort, cores), - name = "Executor") - workerUrl.foreach{ url => - actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + workerUrl: Option[String]) { + + SparkHadoopUtil.get.runAsSparkUser { () => + // Debug code + Utils.checkHost(hostname) + + val conf = new SparkConf + // Create a new ActorSystem to run the backend, because we can't create a + // SparkEnv / Executor before getting started with all our system properties, etc + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, + indestructible = true, conf = conf, new SecurityManager(conf)) + // set it + val sparkHostPort = hostname + ":" + boundPort + actorSystem.actorOf( + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, + sparkHostPort, cores), + name = "Executor") + workerUrl.foreach { + url => + actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + } + actorSystem.awaitTermination() + } - actorSystem.awaitTermination() } def main(args: Array[String]) { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 272bcda5f8..98e7e0be81 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -128,8 +128,6 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) - def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) @@ -172,7 +170,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 64e24506e8..9b56f711e0 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -23,10 +23,10 @@ import com.google.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.spark.Logging -import org.apache.spark.TaskState +import org.apache.spark.{Logging, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkHadoopUtil private[spark] class MesosExecutorBackend extends MesosExecutor @@ -95,9 +95,11 @@ private[spark] class MesosExecutorBackend */ private[spark] object MesosExecutorBackend { def main(args: Array[String]) { - MesosNativeLibrary.load() - // Create a new Executor and start it running - val runner = new MesosExecutorBackend() - new MesosExecutorDriver(runner).run() + SparkHadoopUtil.get.runAsSparkUser { () => + MesosNativeLibrary.load() + // Create a new Executor and start it running + val runner = new MesosExecutorBackend() + new MesosExecutorDriver(runner).run() + } } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc13dbecb4..8f0ecb8557 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var registered = false - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) - def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. @@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { + var successed = false try { // Copy @@ -480,6 +478,8 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ApplicationMaster(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ApplicationMaster(args).run() + } } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 65b7215afb..a3bd91590f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.deploy.SparkHadoopUtil /** * An application master that allocates executors on behalf of a driver that is running outside @@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ExecutorLauncher(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ExecutorLauncher(args).run() + } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 90e807160d..c1dfe3f53b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false - - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) def run() { // Setup the directories so things go to YARN approved directories rather @@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => - var successed = false + override def run() { + + var successed = false try { // Copy var mainArgs: Array[String] = new Array[String](args.userArgs.size) @@ -462,6 +460,8 @@ object ApplicationMaster { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ApplicationMaster(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ApplicationMaster(args).run() + } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a14bb377aa..a4ce8766d3 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.spark.deploy.SparkHadoopUtil /** * An application master that allocates executors on behalf of a driver that is running outside @@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp object ExecutorLauncher { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) - new ExecutorLauncher(args).run() + SparkHadoopUtil.get.runAsSparkUser { () => + new ExecutorLauncher(args).run() + } } } -- cgit v1.2.3