aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-01 10:52:13 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-01 10:52:13 -0700
commit8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e (patch)
treebbe9310838ffdf087dc628d4c9993ba58c932ce1 /yarn
parent58e6bc827f1f9dc1afee07dca1bee1f56553dd20 (diff)
downloadspark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.gz
spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.bz2
spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.zip
[SPARK-12343][YARN] Simplify Yarn client and client argument
## What changes were proposed in this pull request? Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments. ## How was this patch tested? This patch is tested manually with unit test. CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set. Author: jerryshao <sshao@hortonworks.com> Closes #11603 from jerryshao/SPARK-12343.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala89
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala189
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala42
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala19
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala8
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala46
12 files changed, 138 insertions, 306 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e941089d1b..9e8453429c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -662,7 +662,7 @@ object ApplicationMaster extends Logging {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
- master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
+ master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 6987e5a55f..5cdec87667 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -27,8 +27,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: Seq[String] = Nil
- var executorMemory = 1024
- var executorCores = 1
var propertiesFile: String = null
parseArgs(args.toList)
@@ -58,18 +56,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail
- case ("--args" | "--arg") :: value :: tail =>
+ case ("--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
- case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
- executorMemory = value
- args = tail
-
- case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
- executorCores = value
- args = tail
-
case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail
@@ -101,12 +91,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --primary-r-file A main R file
- | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
- | place on the PYTHONPATH for Python apps.
- | --args ARGS Arguments to be passed to your application's main class.
+ | --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
- | --executor-cores NUM Number of cores for the executors (Default: 1)
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --properties-file FILE Path to a custom Spark properties file.
""".stripMargin)
// scalastyle:on println
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f0f13a16e0..4dd3ccdf37 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -64,21 +64,44 @@ private[spark] class Client(
extends Logging {
import Client._
+ import YarnSparkHadoopUtil._
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
- private var credentials: Credentials = null
- private val amMemoryOverhead = args.amMemoryOverhead // MB
- private val executorMemoryOverhead = args.executorMemoryOverhead // MB
+
+ private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
+
+ // AM related configurations
+ private val amMemory = if (isClusterMode) {
+ sparkConf.get(DRIVER_MEMORY).toInt
+ } else {
+ sparkConf.get(AM_MEMORY).toInt
+ }
+ private val amMemoryOverhead = {
+ val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
+ sparkConf.get(amMemoryOverheadEntry).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+ }
+ private val amCores = if (isClusterMode) {
+ sparkConf.get(DRIVER_CORES)
+ } else {
+ sparkConf.get(AM_CORES)
+ }
+
+ // Executor related configurations
+ private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+
private val distCacheMgr = new ClientDistributedCacheManager()
- private val isClusterMode = args.isClusterMode
private var loginFromKeytab = false
private var principal: String = null
private var keytab: String = null
+ private var credentials: Credentials = null
private val launcherBackend = new LauncherBackend() {
override def onStopRequest(): Unit = {
@@ -179,8 +202,8 @@ private[spark] class Client(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
- appContext.setApplicationName(args.appName)
- appContext.setQueue(args.amQueue)
+ appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
+ appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
@@ -217,8 +240,8 @@ private[spark] class Client(
}
val capability = Records.newRecord(classOf[Resource])
- capability.setMemory(args.amMemory + amMemoryOverhead)
- capability.setVirtualCores(args.amCores)
+ capability.setMemory(amMemory + amMemoryOverhead)
+ capability.setVirtualCores(amCores)
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
@@ -272,16 +295,16 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
- val executorMem = args.executorMemory + executorMemoryOverhead
+ val executorMem = executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+ throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
"'yarn.nodemanager.resource.memory-mb'.")
}
- val amMem = args.amMemory + amMemoryOverhead
+ val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
- throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+ throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
"Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.")
}
@@ -493,17 +516,15 @@ private[spark] class Client(
*/
val cachedSecondaryJarLinks = ListBuffer.empty[String]
List(
- (args.addJars, LocalResourceType.FILE, true),
- (args.files, LocalResourceType.FILE, false),
- (args.archives, LocalResourceType.ARCHIVE, false)
+ (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true),
+ (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false),
+ (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false)
).foreach { case (flist, resType, addToClasspath) =>
- if (flist != null && !flist.isEmpty()) {
- flist.split(',').foreach { file =>
- val (_, localizedPath) = distribute(file, resType = resType)
- require(localizedPath != null)
- if (addToClasspath) {
- cachedSecondaryJarLinks += localizedPath
- }
+ flist.foreach { file =>
+ val (_, localizedPath) = distribute(file, resType = resType)
+ require(localizedPath != null)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += localizedPath
}
}
}
@@ -519,7 +540,7 @@ private[spark] class Client(
// The python files list needs to be treated especially. All files that are not an
// archive need to be placed in a subdirectory that will be added to PYTHONPATH.
- args.pyFiles.foreach { f =>
+ sparkConf.get(PY_FILES).foreach { f =>
val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
distribute(f, targetDir = targetDir)
}
@@ -678,7 +699,7 @@ private[spark] class Client(
//
// NOTE: the code currently does not handle .py files defined with a "local:" scheme.
val pythonPath = new ListBuffer[String]()
- val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+ val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
if (pyFiles.nonEmpty) {
pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_PYTHON_DIR)
@@ -775,7 +796,7 @@ private[spark] class Client(
var prefixEnv: Option[String] = None
// Add Xmx for AM memory
- javaOpts += "-Xmx" + args.amMemory + "m"
+ javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
@@ -879,8 +900,6 @@ private[spark] class Client(
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++ Seq(
- "--executor-memory", args.executorMemory.toString + "m",
- "--executor-cores", args.executorCores.toString,
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
@@ -919,10 +938,10 @@ private[spark] class Client(
}
def setupCredentials(): Unit = {
- loginFromKeytab = args.principal != null || sparkConf.contains(PRINCIPAL.key)
+ loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
if (loginFromKeytab) {
- principal = Option(args.principal).orElse(sparkConf.get(PRINCIPAL)).get
- keytab = Option(args.keytab).orElse(sparkConf.get(KEYTAB)).orNull
+ principal = sparkConf.get(PRINCIPAL).get
+ keytab = sparkConf.get(KEYTAB).orNull
require(keytab != null, "Keytab must be specified when principal is specified.")
logInfo("Attempting to login to the Kerberos" +
@@ -1084,7 +1103,7 @@ private[spark] class Client(
}
-object Client extends Logging {
+private object Client extends Logging {
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
@@ -1097,11 +1116,7 @@ object Client extends Logging {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- val args = new ClientArguments(argStrings, sparkConf)
- // to maintain backwards-compatibility
- if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
- sparkConf.setIfMissing(EXECUTOR_INSTANCES, args.numExecutors)
- }
+ val args = new ClientArguments(argStrings)
new Client(args, sparkConf).run()
}
@@ -1246,7 +1261,7 @@ object Client extends Logging {
val secondaryJars =
if (args != null) {
- getSecondaryJarUris(Option(args.addJars).map(_.split(",").toSeq))
+ getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE)))
} else {
getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 47b4cc3009..61c027ec44 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -19,118 +19,20 @@ package org.apache.spark.deploy.yarn
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.config._
-import org.apache.spark.util.{IntParam, MemoryParam, Utils}
-
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-private[spark] class ClientArguments(
- args: Array[String],
- sparkConf: SparkConf) {
+private[spark] class ClientArguments(args: Array[String]) {
- var addJars: String = null
- var files: String = null
- var archives: String = null
var userJar: String = null
var userClass: String = null
- var pyFiles: Seq[String] = Nil
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
- var executorMemory = 1024 // MB
- var executorCores = 1
- var numExecutors = DEFAULT_NUMBER_EXECUTORS
- var amQueue = sparkConf.get(QUEUE_NAME)
- var amMemory: Int = _
- var amCores: Int = _
- var appName: String = "Spark"
- var priority = 0
- var principal: String = null
- var keytab: String = null
- def isClusterMode: Boolean = userClass != null
-
- private var driverMemory: Int = Utils.DEFAULT_DRIVER_MEM_MB // MB
- private var driverCores: Int = 1
- private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
parseArgs(args.toList)
- loadEnvironmentArgs()
- validateArgs()
-
- // Additional memory to allocate to containers
- val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
- val amMemoryOverhead = sparkConf.get(amMemoryOverheadEntry).getOrElse(
- math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
-
- val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
- math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
-
- /** Load any default arguments provided through environment variables and Spark properties. */
- private def loadEnvironmentArgs(): Unit = {
- // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
- // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
- files = Option(files)
- .orElse(sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
- .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
- .orNull
- archives = Option(archives)
- .orElse(sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
- .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
- .orNull
- // If dynamic allocation is enabled, start at the configured initial number of executors.
- // Default to minExecutors if no initialExecutors is set.
- numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
- principal = Option(principal)
- .orElse(sparkConf.get(PRINCIPAL))
- .orNull
- keytab = Option(keytab)
- .orElse(sparkConf.get(KEYTAB))
- .orNull
- }
-
- /**
- * Fail fast if any arguments provided are invalid.
- * This is intended to be called only after the provided arguments have been parsed.
- */
- private def validateArgs(): Unit = {
- if (numExecutors < 0 || (!isDynamicAllocationEnabled && numExecutors == 0)) {
- throw new IllegalArgumentException(
- s"""
- |Number of executors was $numExecutors, but must be at least 1
- |(or 0 if dynamic executor allocation is enabled).
- |${getUsageMessage()}
- """.stripMargin)
- }
- if (executorCores < sparkConf.get(CPUS_PER_TASK)) {
- throw new SparkException(s"Executor cores must not be less than ${CPUS_PER_TASK.key}.")
- }
- // scalastyle:off println
- if (isClusterMode) {
- for (key <- Seq(AM_MEMORY.key, AM_MEMORY_OVERHEAD.key, AM_CORES.key)) {
- if (sparkConf.contains(key)) {
- println(s"$key is set but does not apply in cluster mode.")
- }
- }
- amMemory = driverMemory
- amCores = driverCores
- } else {
- for (key <- Seq(DRIVER_MEMORY_OVERHEAD.key, DRIVER_CORES.key)) {
- if (sparkConf.contains(key)) {
- println(s"$key is set but does not apply in client mode.")
- }
- }
- amMemory = sparkConf.get(AM_MEMORY).toInt
- amCores = sparkConf.get(AM_CORES)
- }
- // scalastyle:on println
- }
private def parseArgs(inputArgs: List[String]): Unit = {
var args = inputArgs
- // scalastyle:off println
while (!args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
@@ -149,88 +51,16 @@ private[spark] class ClientArguments(
primaryRFile = value
args = tail
- case ("--args" | "--arg") :: value :: tail =>
- if (args(0) == "--args") {
- println("--args is deprecated. Use --arg instead.")
- }
+ case ("--arg") :: value :: tail =>
userArgs += value
args = tail
- case ("--master-class" | "--am-class") :: value :: tail =>
- println(s"${args(0)} is deprecated and is not used anymore.")
- args = tail
-
- case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
- if (args(0) == "--master-memory") {
- println("--master-memory is deprecated. Use --driver-memory instead.")
- }
- driverMemory = value
- args = tail
-
- case ("--driver-cores") :: IntParam(value) :: tail =>
- driverCores = value
- args = tail
-
- case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
- if (args(0) == "--num-workers") {
- println("--num-workers is deprecated. Use --num-executors instead.")
- }
- numExecutors = value
- args = tail
-
- case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
- if (args(0) == "--worker-memory") {
- println("--worker-memory is deprecated. Use --executor-memory instead.")
- }
- executorMemory = value
- args = tail
-
- case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
- if (args(0) == "--worker-cores") {
- println("--worker-cores is deprecated. Use --executor-cores instead.")
- }
- executorCores = value
- args = tail
-
- case ("--queue") :: value :: tail =>
- amQueue = value
- args = tail
-
- case ("--name") :: value :: tail =>
- appName = value
- args = tail
-
- case ("--addJars") :: value :: tail =>
- addJars = value
- args = tail
-
- case ("--py-files") :: value :: tail =>
- pyFiles = value.split(",")
- args = tail
-
- case ("--files") :: value :: tail =>
- files = value
- args = tail
-
- case ("--archives") :: value :: tail =>
- archives = value
- args = tail
-
- case ("--principal") :: value :: tail =>
- principal = value
- args = tail
-
- case ("--keytab") :: value :: tail =>
- keytab = value
- args = tail
-
case Nil =>
case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
}
}
- // scalastyle:on println
if (primaryPyFile != null && primaryRFile != null) {
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
@@ -240,7 +70,6 @@ private[spark] class ClientArguments(
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
- val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
message +
s"""
|Usage: org.apache.spark.deploy.yarn.Client [options]
@@ -252,20 +81,6 @@ private[spark] class ClientArguments(
| --primary-r-file A main R file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
- | --num-executors NUM Number of executors to start (Default: 2)
- | --executor-cores NUM Number of cores per executor (Default: 1).
- | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: $mem_mb Mb)
- | --driver-cores NUM Number of cores used by the driver (Default: 1).
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
- | --name NAME The name of your application (Default: Spark)
- | --queue QUEUE The hadoop queue to use for allocation requests (Default:
- | 'default')
- | --addJars jars Comma separated list of local jars that want SparkContext.addJar
- | to work with.
- | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
- | place on the PYTHONPATH for Python apps.
- | --files files Comma separated list of files to be distributed with the job.
- | --archives archives Comma separated list of archives to be distributed with the job.
""".stripMargin
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d094302362..7d71a642f6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -61,7 +62,6 @@ private[yarn] class YarnAllocator(
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
securityMgr: SecurityManager)
extends Logging {
@@ -107,12 +107,12 @@ private[yarn] class YarnAllocator(
private val containerIdToExecutorId = new HashMap[ContainerId, String]
// Executor memory in MB.
- protected val executorMemory = args.executorMemory
+ protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
// Number of cores per executor.
- protected val executorCores = args.executorCores
+ protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 83d30b7352..e7f7544664 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
/**
* Handles registering and unregistering the application with the YARN ResourceManager.
*/
-private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logging {
+private[spark] class YarnRMClient extends Logging {
private var amClient: AMRMClient[ContainerRequest] = _
private var uiHistoryAddress: String = _
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
- securityMgr)
+ new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr)
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2915e664be..5af2c29808 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -512,7 +512,7 @@ object YarnSparkHadoopUtil {
val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
- s"initial executor number $initialNumExecutors must between min executor number" +
+ s"initial executor number $initialNumExecutors must between min executor number " +
s"$minNumExecutors and max executor number $maxNumExecutors")
initialNumExecutors
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 0789567ae6..a3b9134b58 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -85,11 +85,18 @@ package object config {
private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
.stringConf
- .optional
+ .toSequence
+ .withDefault(Nil)
private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
.stringConf
- .optional
+ .toSequence
+ .withDefault(Nil)
+
+ private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
+ .stringConf
+ .toSequence
+ .withDefault(Nil)
private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
.doc("Whether to preserve temporary files created by the job in HDFS.")
@@ -183,7 +190,7 @@ package object config {
private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
.intConf
- .optional
+ .withDefault(1)
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
.bytesConf(ByteUnit.MiB)
@@ -191,6 +198,10 @@ package object config {
/* Executor configuration. */
+ private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
+ .intConf
+ .withDefault(1)
+
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.optional
@@ -245,5 +256,4 @@ package object config {
.stringConf
.toSequence
.optional
-
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9fc727904b..56dc0004d0 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -48,11 +48,10 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
- argsArrayBuf ++= getExtraClientArguments
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
- val args = new ClientArguments(argsArrayBuf.toArray, conf)
- totalExpectedExecutors = args.numExecutors
+ val args = new ClientArguments(argsArrayBuf.toArray)
+ totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
client = new Client(args, conf)
bindToYarn(client.submitApplication(), None)
@@ -73,43 +72,6 @@ private[spark] class YarnClientSchedulerBackend(
}
/**
- * Return any extra command line arguments to be passed to Client provided in the form of
- * environment variables or Spark properties.
- */
- private def getExtraClientArguments: Seq[String] = {
- val extraArgs = new ArrayBuffer[String]
- // List of (target Client argument, environment variable, Spark property)
- val optionTuples =
- List(
- ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
- ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
- ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
- ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
- ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
- ("--py-files", null, "spark.submit.pyFiles")
- )
- // Warn against the following deprecated environment variables: env var -> suggestion
- val deprecatedEnvVars = Map(
- "SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit",
- "SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit")
- optionTuples.foreach { case (optionName, envVar, sparkProp) =>
- if (sc.getConf.contains(sparkProp)) {
- extraArgs += (optionName, sc.getConf.get(sparkProp))
- } else if (envVar != null && System.getenv(envVar) != null) {
- extraArgs += (optionName, System.getenv(envVar))
- if (deprecatedEnvVars.contains(envVar)) {
- logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.")
- }
- }
- }
- // The app name is a special case because "spark.app.name" is required of all applications.
- // As a result, the corresponding "SPARK_YARN_APP_NAME" is already handled preemptively in
- // SparkSubmitArguments if "spark.app.name" is not explicitly set by the user. (SPARK-5222)
- sc.getConf.getOption("spark.app.name").foreach(v => extraArgs += ("--name", v))
- extraArgs
- }
-
- /**
* Report the state of the application until it is running.
* If the application has finished, failed or been killed in the process, throw an exception.
* This assumes both `client` and `appId` have already been set.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 64723c361c..2eaafa072a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -118,8 +118,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(SPARK_JARS, Seq(SPARK))
.set(USER_CLASS_PATH_FIRST, true)
+ .set("spark.yarn.dist.jars", ADDED)
val env = new MutableHashMap[String, String]()
- val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+ val args = new ClientArguments(Array("--jar", USER))
populateClasspath(args, conf, sparkConf, env)
@@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("Jar path propagation through SparkConf") {
- val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
- val client = createClient(sparkConf,
- args = Array("--jar", USER, "--addJars", ADDED))
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(SPARK_JARS, Seq(SPARK))
+ .set("spark.yarn.dist.jars", ADDED)
+ val client = createClient(sparkConf, args = Array("--jar", USER))
val tempDir = Utils.createTempDir()
try {
@@ -192,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
.set(MAX_APP_ATTEMPTS, 42)
- val args = new ClientArguments(Array(
- "--name", "foo-test-app",
- "--queue", "staging-queue"), sparkConf)
+ .set("spark.app.name", "foo-test-app")
+ .set(QUEUE_NAME, "staging-queue")
+ val args = new ClientArguments(Array())
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
sparkConf: SparkConf,
conf: Configuration = new Configuration(),
args: Array[String] = Array()): Client = {
- val clientArgs = new ClientArguments(args, sparkConf)
+ val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort())
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 0587444a33..a641a6e73e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
- "--executor-cores", "5",
- "--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
- sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
+ sparkConfClone
+ .set("spark.executor.instances", maxExecutors.toString)
+ .set("spark.executor.cores", "5")
+ .set("spark.executor.memory", "2048")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
@@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new ApplicationMasterArguments(args),
new SecurityManager(sparkConf))
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 26520529ec..b2b4d84f53 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testBasicYarnApp(false)
}
+ test("run Spark in yarn-client mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-cluster mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.driver.cores" -> "1",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-client mode with additional jar") {
+ testWithAddJar(true)
+ }
+
+ test("run Spark in yarn-cluster mode with additional jar") {
+ testWithAddJar(false)
+ }
+
test("run Spark in yarn-cluster mode unsuccessfully") {
// Don't provide arguments so the driver will fail.
val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
@@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
}
}
- private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
- appArgs = Seq(result.getAbsolutePath()))
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = conf)
checkResult(finalState, result)
}
+ private def testWithAddJar(clientMode: Boolean): Unit = {
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + originalJar.getPath()))
+ checkResult(finalState, driverResult, "ORIGINAL")
+ checkResult(finalState, executorResult, "ORIGINAL")
+ }
+
private def testPySpark(clientMode: Boolean): Unit = {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)