aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-21 10:26:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-21 10:26:33 -0700
commitfb98488fc8e68cc84f6e0750fd4e9e29029879d2 (patch)
treeeba99b56bea8ec2e357020a413bf9cf04a4e3308 /yarn
parent3a390bfd80f80739b9d847780eccc443fc2dc0ea (diff)
downloadspark-fb98488fc8e68cc84f6e0750fd4e9e29029879d2.tar.gz
spark-fb98488fc8e68cc84f6e0750fd4e9e29029879d2.tar.bz2
spark-fb98488fc8e68cc84f6e0750fd4e9e29029879d2.zip
Clean up and simplify Spark configuration
Over time as we've added more deployment modes, this have gotten a bit unwieldy with user-facing configuration options in Spark. Going forward we'll advise all users to run `spark-submit` to launch applications. This is a WIP patch but it makes the following improvements: 1. Improved `spark-env.sh.template` which was missing a lot of things users now set in that file. 2. Removes the shipping of SPARK_CLASSPATH, SPARK_JAVA_OPTS, and SPARK_LIBRARY_PATH to the executors on the cluster. This was an ugly hack. Instead it introduces config variables spark.executor.extraJavaOpts, spark.executor.extraLibraryPath, and spark.executor.extraClassPath. 3. Adds ability to set these same variables for the driver using `spark-submit`. 4. Allows you to load system properties from a `spark-defaults.conf` file when running `spark-submit`. This will allow setting both SparkConf options and other system properties utilized by `spark-submit`. 5. Made `SPARK_LOCAL_IP` an environment variable rather than a SparkConf property. This is more consistent with it being set on each node. Author: Patrick Wendell <pwendell@gmail.com> Closes #299 from pwendell/config-cleanup and squashes the following commits: 127f301 [Patrick Wendell] Improvements to testing a006464 [Patrick Wendell] Moving properties file template. b4b496c [Patrick Wendell] spark-defaults.properties -> spark-defaults.conf 0086939 [Patrick Wendell] Minor style fixes af09e3e [Patrick Wendell] Mention config file in docs and clean-up docs b16e6a2 [Patrick Wendell] Cleanup of spark-submit script and Scala quick start guide af0adf7 [Patrick Wendell] Automatically add user jar a56b125 [Patrick Wendell] Responses to Tom's review d50c388 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into config-cleanup a762901 [Patrick Wendell] Fixing test failures ffa00fe [Patrick Wendell] Review feedback fda0301 [Patrick Wendell] Note 308f1f6 [Patrick Wendell] Properly escape quotes and other clean-up for YARN e83cd8f [Patrick Wendell] Changes to allow re-use of test applications be42f35 [Patrick Wendell] Handle case where SPARK_HOME is not set c2a2909 [Patrick Wendell] Test compile fixes 4ee6f9d [Patrick Wendell] Making YARN doc changes consistent afc9ed8 [Patrick Wendell] Cleaning up line limits and two compile errors. b08893b [Patrick Wendell] Additional improvements. ace4ead [Patrick Wendell] Responses to review feedback. b72d183 [Patrick Wendell] Review feedback for spark env file 46555c1 [Patrick Wendell] Review feedback and import clean-ups 437aed1 [Patrick Wendell] Small fix 761ebcd [Patrick Wendell] Library path and classpath for drivers 7cc70e4 [Patrick Wendell] Clean up terminology inside of spark-env script 5b0ba8e [Patrick Wendell] Don't ship executor envs 84cc5e5 [Patrick Wendell] Small clean-up 1f75238 [Patrick Wendell] SPARK_JAVA_OPTS --> SPARK_MASTER_OPTS for master settings 4982331 [Patrick Wendell] Remove SPARK_LIBRARY_PATH 6eaf7d0 [Patrick Wendell] executorJavaOpts 0faa3b6 [Patrick Wendell] Stash of adding config options in submit script and YARN ac2d65e [Patrick Wendell] Change spark.local.dir -> SPARK_LOCAL_DIRS
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala6
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala137
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala21
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala6
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
11 files changed, 116 insertions, 80 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f078d06aaf..2f74965900 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,6 +39,9 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
+/**
+ * An application master that runs the users driver program and allocates executors.
+ */
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf: SparkConf) extends Logging {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0179b0600c..00c7649e68 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.util.{Apps, Records}
import org.apache.spark.{Logging, SparkConf}
-
+/**
+ * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
+ */
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 21f14576ef..ea356f33eb 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -34,6 +34,12 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
+/**
+ * An application master that allocates executors on behalf of a driver that is running outside
+ * the cluster.
+ *
+ * This is used only in yarn-client mode.
+ */
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index d6d46a5f6c..95f0f9d0ff 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -54,6 +54,10 @@ object AllocationType extends Enumeration {
// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
// more info on how we are requesting for containers.
+
+/**
+ * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ */
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 566de712fc..c00b63669c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -22,16 +22,13 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
@@ -39,19 +36,18 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Records, Apps}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
+import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
- * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
- * which will launch a Spark master process and negotiate resources throughout its duration.
+ * Client submits an application to the YARN ResourceManager.
+ *
+ * Depending on the deployment mode this will launch one of two application master classes:
+ * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
+ * which launches a driver program inside of the cluster.
+ * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
+ * request executors on behalf of a driver running outside of the cluster.
*/
trait ClientBase extends Logging {
val args: ClientArguments
@@ -70,7 +66,6 @@ trait ClientBase extends Logging {
// TODO(harvey): This could just go in ClientArguments.
def validateArgs() = {
Map(
- (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
@@ -208,7 +203,7 @@ trait ClientBase extends Logging {
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
Map(
- ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
+ ClientBase.SPARK_JAR -> ClientBase.getSparkJar, ClientBase.APP_JAR -> args.userJar,
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
).foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -251,8 +246,10 @@ trait ClientBase extends Logging {
logInfo("Setting up the launch environment")
val env = new HashMap[String, String]()
+
+ val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
- ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
+ ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -268,9 +265,6 @@ trait ClientBase extends Logging {
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
File.pathSeparator)
- // Add each SPARK_* key to the environment.
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-
env
}
@@ -299,13 +293,13 @@ trait ClientBase extends Logging {
val amMemory = calculateAMMemory(newApp)
- var JAVA_OPTS = ""
+ val JAVA_OPTS = ListBuffer[String]()
// Add Xmx for AM memory
JAVA_OPTS += "-Xmx" + amMemory + "m"
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
- JAVA_OPTS += " -Djava.io.tmpdir=" + tmpDir
+ JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
@@ -319,35 +313,48 @@ trait ClientBase extends Logging {
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
+ JAVA_OPTS += "-XX:+CMSIncrementalMode"
+ JAVA_OPTS += "-XX:+CMSIncrementalPacing"
+ JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
+ JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
}
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
+ // TODO: it might be nicer to pass these as an internal environment variable rather than
+ // as Java options, due to complications with string parsing of nested quotes.
+ if (args.amClass == classOf[ExecutorLauncher].getName) {
+ // If we are being launched in client mode, forward the spark-conf options
+ // onto the executor launcher
+ for ((k, v) <- sparkConf.getAll) {
+ JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+ }
+ } else {
+ // If we are being launched in standalone mode, capture and forward any spark
+ // system properties (e.g. set by spark-class).
+ for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
+ JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+ }
+ sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
+ sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
// Command for the ApplicationMaster
- val commands = List[String](
- Environment.JAVA_HOME.$() + "/bin/java" +
- " -server " +
- JAVA_OPTS +
- " " + args.amClass +
- " --class " + args.userClass +
- " --jar " + args.userJar +
- userArgsToString(args) +
- " --executor-memory " + args.executorMemory +
- " --executor-cores " + args.executorCores +
- " --num-executors " + args.numExecutors +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
- logInfo("Command for starting the Spark ApplicationMaster: " + commands(0))
- amContainer.setCommands(commands)
+ val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+ JAVA_OPTS ++
+ Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
+ userArgsToString(args),
+ "--executor-memory", args.executorMemory.toString,
+ "--executor-cores", args.executorCores.toString,
+ "--num-executors ", args.numExecutors.toString,
+ "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+ logInfo("Command for starting the Spark ApplicationMaster: " + commands)
+
+ // TODO: it would be nicer to just make sure there are no null commands here
+ val printableCommands = commands.map(s => if (s == null) "null" else s).toList
+ amContainer.setCommands(printableCommands)
setupSecurityToken(amContainer)
amContainer
@@ -361,6 +368,8 @@ object ClientBase {
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"
+ def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
+
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
val classpathEntries = Option(conf.getStrings(
@@ -433,10 +442,9 @@ object ClientBase {
" -Dlog4j.configuration=" + log4jConf
}
- def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
- log4jConf: String, env: HashMap[String, String]) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
- File.pathSeparator)
+ def populateClasspath(conf: Configuration, sparkConf: SparkConf, log4jConf: String,
+ env: HashMap[String, String], extraClassPath: Option[String] = None) {
+
if (log4jConf != null) {
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
// classpath. Note that this only works if the custom config's file name is
@@ -448,19 +456,26 @@ object ClientBase {
File.pathSeparator)
}
}
+
+ /** Add entry to the classpath. */
+ def addClasspathEntry(path: String) = Apps.addToEnvironment(env, Environment.CLASSPATH.name, path)
+ /** Add entry to the classpath. Interpreted as a path relative to the working directory. */
+ def addPwdClasspathEntry(entry: String) = addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry)
+
+ extraClassPath.foreach(addClasspathEntry)
+
+ addClasspathEntry(Environment.PWD.$())
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
- .toBoolean
- if (userClasspathFirst) {
- addUserClasspath(args, env)
- }
- addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
- ClientBase.populateHadoopClasspath(conf, env)
- if (!userClasspathFirst) {
- addUserClasspath(args, env)
+ if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
+ addPwdClasspathEntry(APP_JAR)
+ addPwdClasspathEntry(SPARK_JAR)
+ ClientBase.populateHadoopClasspath(conf, env)
+ } else {
+ addPwdClasspathEntry(SPARK_JAR)
+ ClientBase.populateHadoopClasspath(conf, env)
+ addPwdClasspathEntry(APP_JAR)
}
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
- Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
+ addPwdClasspathEntry("*")
}
/**
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 40b38661f7..7d07f6f680 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -19,26 +19,18 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, Logging}
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.spark.{Logging, SparkConf}
trait ExecutorRunnableUtil extends Logging {
@@ -58,8 +50,10 @@ trait ExecutorRunnableUtil extends Logging {
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+
+ // Set extra Java options for the executor, if defined
+ sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+ JAVA_OPTS += opts
}
JAVA_OPTS += " -Djava.io.tmpdir=" +
@@ -162,8 +156,9 @@ trait ExecutorRunnableUtil extends Logging {
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
+ val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
- ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
+ ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 161918859e..ce2dde0631 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
import org.apache.spark.scheduler.TaskSchedulerImpl
import scala.collection.mutable.ArrayBuffer
@@ -54,7 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
"--class", "notused",
"--jar", null,
"--args", hostport,
- "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
+ "--am-class", classOf[ExecutorLauncher].getName
)
// process any optional arguments, given either as environment variables
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b225be6a79..90e807160d 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,6 +42,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
+/**
+ * An application master that runs the user's driver program and allocates executors.
+ */
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf: SparkConf) extends Logging {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 77eb1276a0..2e2fb5d4fa 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -34,9 +34,7 @@ import org.apache.spark.{Logging, SparkConf}
/**
- * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
- * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
- * which will launch a Spark master process and negotiate resources throughout its duration.
+ * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
*/
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 67ed591c78..a14bb377aa 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -35,6 +35,12 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+/**
+ * An application master that allocates executors on behalf of a driver that is running outside
+ * the cluster.
+ *
+ * This is used only in yarn-client mode.
+ */
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4fafae1aff..a979fe4d62 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -56,6 +56,10 @@ object AllocationType extends Enumeration {
// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
// more info on how we are requesting for containers.
+
+/**
+ * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ */
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val amClient: AMRMClient[ContainerRequest],