aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala89
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala189
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala42
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala19
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala8
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala46
12 files changed, 138 insertions, 306 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e941089d1b..9e8453429c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -662,7 +662,7 @@ object ApplicationMaster extends Logging {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
- master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
+ master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 6987e5a55f..5cdec87667 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -27,8 +27,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: Seq[String] = Nil
- var executorMemory = 1024
- var executorCores = 1
var propertiesFile: String = null
parseArgs(args.toList)
@@ -58,18 +56,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail
- case ("--args" | "--arg") :: value :: tail =>
+ case ("--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
- case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
- executorMemory = value
- args = tail
-
- case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
- executorCores = value
- args = tail
-
case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail
@@ -101,12 +91,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --primary-r-file A main R file
- | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
- | place on the PYTHONPATH for Python apps.
- | --args ARGS Arguments to be passed to your application's main class.
+ | --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
- | --executor-cores NUM Number of cores for the executors (Default: 1)
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --properties-file FILE Path to a custom Spark properties file.
""".stripMargin)
// scalastyle:on println
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f0f13a16e0..4dd3ccdf37 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -64,21 +64,44 @@ private[spark] class Client(
extends Logging {
import Client._
+ import YarnSparkHadoopUtil._
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
- private var credentials: Credentials = null
- private val amMemoryOverhead = args.amMemoryOverhead // MB
- private val executorMemoryOverhead = args.executorMemoryOverhead // MB
+
+ private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
+
+ // AM related configurations
+ private val amMemory = if (isClusterMode) {
+ sparkConf.get(DRIVER_MEMORY).toInt
+ } else {
+ sparkConf.get(AM_MEMORY).toInt
+ }
+ private val amMemoryOverhead = {
+ val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
+ sparkConf.get(amMemoryOverheadEntry).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+ }
+ private val amCores = if (isClusterMode) {
+ sparkConf.get(DRIVER_CORES)
+ } else {
+ sparkConf.get(AM_CORES)
+ }
+
+ // Executor related configurations
+ private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+
private val distCacheMgr = new ClientDistributedCacheManager()
- private val isClusterMode = args.isClusterMode
private var loginFromKeytab = false
private var principal: String = null
private var keytab: String = null
+ private var credentials: Credentials = null
private val launcherBackend = new LauncherBackend() {
override def onStopRequest(): Unit = {
@@ -179,8 +202,8 @@ private[spark] class Client(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
- appContext.setApplicationName(args.appName)
- appContext.setQueue(args.amQueue)
+ appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
+ appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
@@ -217,8 +240,8 @@ private[spark] class Client(
}
val capability = Records.newRecord(classOf[Resource])
- capability.setMemory(args.amMemory + amMemoryOverhead)
- capability.setVirtualCores(args.amCores)
+ capability.setMemory(amMemory + amMemoryOverhead)
+ capability.setVirtualCores(amCores)
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
@@ -272,16 +295,16 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
- val executorMem = args.executorMemory + executorMemoryOverhead
+ val executorMem = executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+ throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
"'yarn.nodemanager.resource.memory-mb'.")
}
- val amMem = args.amMemory + amMemoryOverhead
+ val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
- throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+ throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
"Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.")
}
@@ -493,17 +516,15 @@ private[spark] class Client(
*/
val cachedSecondaryJarLinks = ListBuffer.empty[String]
List(
- (args.addJars, LocalResourceType.FILE, true),
- (args.files, LocalResourceType.FILE, false),
- (args.archives, LocalResourceType.ARCHIVE, false)
+ (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true),
+ (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false),
+ (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false)
).foreach { case (flist, resType, addToClasspath) =>
- if (flist != null && !flist.isEmpty()) {
- flist.split(',').foreach { file =>
- val (_, localizedPath) = distribute(file, resType = resType)
- require(localizedPath != null)
- if (addToClasspath) {
- cachedSecondaryJarLinks += localizedPath
- }
+ flist.foreach { file =>
+ val (_, localizedPath) = distribute(file, resType = resType)
+ require(localizedPath != null)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += localizedPath
}
}
}
@@ -519,7 +540,7 @@ private[spark] class Client(
// The python files list needs to be treated especially. All files that are not an
// archive need to be placed in a subdirectory that will be added to PYTHONPATH.
- args.pyFiles.foreach { f =>
+ sparkConf.get(PY_FILES).foreach { f =>
val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
distribute(f, targetDir = targetDir)
}
@@ -678,7 +699,7 @@ private[spark] class Client(
//
// NOTE: the code currently does not handle .py files defined with a "local:" scheme.
val pythonPath = new ListBuffer[String]()
- val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+ val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
if (pyFiles.nonEmpty) {
pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_PYTHON_DIR)
@@ -775,7 +796,7 @@ private[spark] class Client(
var prefixEnv: Option[String] = None
// Add Xmx for AM memory
- javaOpts += "-Xmx" + args.amMemory + "m"
+ javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
@@ -879,8 +900,6 @@ private[spark] class Client(
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++ Seq(
- "--executor-memory", args.executorMemory.toString + "m",
- "--executor-cores", args.executorCores.toString,
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
@@ -919,10 +938,10 @@ private[spark] class Client(
}
def setupCredentials(): Unit = {
- loginFromKeytab = args.principal != null || sparkConf.contains(PRINCIPAL.key)
+ loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
if (loginFromKeytab) {
- principal = Option(args.principal).orElse(sparkConf.get(PRINCIPAL)).get
- keytab = Option(args.keytab).orElse(sparkConf.get(KEYTAB)).orNull
+ principal = sparkConf.get(PRINCIPAL).get
+ keytab = sparkConf.get(KEYTAB).orNull
require(keytab != null, "Keytab must be specified when principal is specified.")
logInfo("Attempting to login to the Kerberos" +
@@ -1084,7 +1103,7 @@ private[spark] class Client(
}
-object Client extends Logging {
+private object Client extends Logging {
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
@@ -1097,11 +1116,7 @@ object Client extends Logging {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- val args = new ClientArguments(argStrings, sparkConf)
- // to maintain backwards-compatibility
- if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
- sparkConf.setIfMissing(EXECUTOR_INSTANCES, args.numExecutors)
- }
+ val args = new ClientArguments(argStrings)
new Client(args, sparkConf).run()
}
@@ -1246,7 +1261,7 @@ object Client extends Logging {
val secondaryJars =
if (args != null) {
- getSecondaryJarUris(Option(args.addJars).map(_.split(",").toSeq))
+ getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE)))
} else {
getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 47b4cc3009..61c027ec44 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -19,118 +19,20 @@ package org.apache.spark.deploy.yarn
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.config._
-import org.apache.spark.util.{IntParam, MemoryParam, Utils}
-
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-private[spark] class ClientArguments(
- args: Array[String],
- sparkConf: SparkConf) {
+private[spark] class ClientArguments(args: Array[String]) {
- var addJars: String = null
- var files: String = null
- var archives: String = null
var userJar: String = null
var userClass: String = null
- var pyFiles: Seq[String] = Nil
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
- var executorMemory = 1024 // MB
- var executorCores = 1
- var numExecutors = DEFAULT_NUMBER_EXECUTORS
- var amQueue = sparkConf.get(QUEUE_NAME)
- var amMemory: Int = _
- var amCores: Int = _
- var appName: String = "Spark"
- var priority = 0
- var principal: String = null
- var keytab: String = null
- def isClusterMode: Boolean = userClass != null
-
- private var driverMemory: Int = Utils.DEFAULT_DRIVER_MEM_MB // MB
- private var driverCores: Int = 1
- private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
parseArgs(args.toList)
- loadEnvironmentArgs()
- validateArgs()
-
- // Additional memory to allocate to containers
- val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
- val amMemoryOverhead = sparkConf.get(amMemoryOverheadEntry).getOrElse(
- math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
-
- val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
- math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
-
- /** Load any default arguments provided through environment variables and Spark properties. */
- private def loadEnvironmentArgs(): Unit = {
- // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
- // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
- files = Option(files)
- .orElse(sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
- .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
- .orNull
- archives = Option(archives)
- .orElse(sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
- .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
- .orNull
- // If dynamic allocation is enabled, start at the configured initial number of executors.
- // Default to minExecutors if no initialExecutors is set.
- numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
- principal = Option(principal)
- .orElse(sparkConf.get(PRINCIPAL))
- .orNull
- keytab = Option(keytab)
- .orElse(sparkConf.get(KEYTAB))
- .orNull
- }
-
- /**
- * Fail fast if any arguments provided are invalid.
- * This is intended to be called only after the provided arguments have been parsed.
- */
- private def validateArgs(): Unit = {
- if (numExecutors < 0 || (!isDynamicAllocationEnabled && numExecutors == 0)) {
- throw new IllegalArgumentException(
- s"""
- |Number of executors was $numExecutors, but must be at least 1
- |(or 0 if dynamic executor allocation is enabled).
- |${getUsageMessage()}
- """.stripMargin)
- }
- if (executorCores < sparkConf.get(CPUS_PER_TASK)) {
- throw new SparkException(s"Executor cores must not be less than ${CPUS_PER_TASK.key}.")
- }
- // scalastyle:off println
- if (isClusterMode) {
- for (key <- Seq(AM_MEMORY.key, AM_MEMORY_OVERHEAD.key, AM_CORES.key)) {
- if (sparkConf.contains(key)) {
- println(s"$key is set but does not apply in cluster mode.")
- }
- }
- amMemory = driverMemory
- amCores = driverCores
- } else {
- for (key <- Seq(DRIVER_MEMORY_OVERHEAD.key, DRIVER_CORES.key)) {
- if (sparkConf.contains(key)) {
- println(s"$key is set but does not apply in client mode.")
- }
- }
- amMemory = sparkConf.get(AM_MEMORY).toInt
- amCores = sparkConf.get(AM_CORES)
- }
- // scalastyle:on println
- }
private def parseArgs(inputArgs: List[String]): Unit = {
var args = inputArgs
- // scalastyle:off println
while (!args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
@@ -149,88 +51,16 @@ private[spark] class ClientArguments(
primaryRFile = value
args = tail
- case ("--args" | "--arg") :: value :: tail =>
- if (args(0) == "--args") {
- println("--args is deprecated. Use --arg instead.")
- }
+ case ("--arg") :: value :: tail =>
userArgs += value
args = tail
- case ("--master-class" | "--am-class") :: value :: tail =>
- println(s"${args(0)} is deprecated and is not used anymore.")
- args = tail
-
- case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
- if (args(0) == "--master-memory") {
- println("--master-memory is deprecated. Use --driver-memory instead.")
- }
- driverMemory = value
- args = tail
-
- case ("--driver-cores") :: IntParam(value) :: tail =>
- driverCores = value
- args = tail
-
- case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
- if (args(0) == "--num-workers") {
- println("--num-workers is deprecated. Use --num-executors instead.")
- }
- numExecutors = value
- args = tail
-
- case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
- if (args(0) == "--worker-memory") {
- println("--worker-memory is deprecated. Use --executor-memory instead.")
- }
- executorMemory = value
- args = tail
-
- case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
- if (args(0) == "--worker-cores") {
- println("--worker-cores is deprecated. Use --executor-cores instead.")
- }
- executorCores = value
- args = tail
-
- case ("--queue") :: value :: tail =>
- amQueue = value
- args = tail
-
- case ("--name") :: value :: tail =>
- appName = value
- args = tail
-
- case ("--addJars") :: value :: tail =>
- addJars = value
- args = tail
-
- case ("--py-files") :: value :: tail =>
- pyFiles = value.split(",")
- args = tail
-
- case ("--files") :: value :: tail =>
- files = value
- args = tail
-
- case ("--archives") :: value :: tail =>
- archives = value
- args = tail
-
- case ("--principal") :: value :: tail =>
- principal = value
- args = tail
-
- case ("--keytab") :: value :: tail =>
- keytab = value
- args = tail
-
case Nil =>
case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
}
}
- // scalastyle:on println
if (primaryPyFile != null && primaryRFile != null) {
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
@@ -240,7 +70,6 @@ private[spark] class ClientArguments(
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
- val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
message +
s"""
|Usage: org.apache.spark.deploy.yarn.Client [options]
@@ -252,20 +81,6 @@ private[spark] class ClientArguments(
| --primary-r-file A main R file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
- | --num-executors NUM Number of executors to start (Default: 2)
- | --executor-cores NUM Number of cores per executor (Default: 1).
- | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: $mem_mb Mb)
- | --driver-cores NUM Number of cores used by the driver (Default: 1).
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
- | --name NAME The name of your application (Default: Spark)
- | --queue QUEUE The hadoop queue to use for allocation requests (Default:
- | 'default')
- | --addJars jars Comma separated list of local jars that want SparkContext.addJar
- | to work with.
- | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
- | place on the PYTHONPATH for Python apps.
- | --files files Comma separated list of files to be distributed with the job.
- | --archives archives Comma separated list of archives to be distributed with the job.
""".stripMargin
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d094302362..7d71a642f6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -61,7 +62,6 @@ private[yarn] class YarnAllocator(
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
securityMgr: SecurityManager)
extends Logging {
@@ -107,12 +107,12 @@ private[yarn] class YarnAllocator(
private val containerIdToExecutorId = new HashMap[ContainerId, String]
// Executor memory in MB.
- protected val executorMemory = args.executorMemory
+ protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
// Number of cores per executor.
- protected val executorCores = args.executorCores
+ protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 83d30b7352..e7f7544664 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
/**
* Handles registering and unregistering the application with the YARN ResourceManager.
*/
-private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logging {
+private[spark] class YarnRMClient extends Logging {
private var amClient: AMRMClient[ContainerRequest] = _
private var uiHistoryAddress: String = _
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
- securityMgr)
+ new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr)
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2915e664be..5af2c29808 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -512,7 +512,7 @@ object YarnSparkHadoopUtil {
val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
- s"initial executor number $initialNumExecutors must between min executor number" +
+ s"initial executor number $initialNumExecutors must between min executor number " +
s"$minNumExecutors and max executor number $maxNumExecutors")
initialNumExecutors
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 0789567ae6..a3b9134b58 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -85,11 +85,18 @@ package object config {
private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
.stringConf
- .optional
+ .toSequence
+ .withDefault(Nil)
private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
.stringConf
- .optional
+ .toSequence
+ .withDefault(Nil)
+
+ private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
+ .stringConf
+ .toSequence
+ .withDefault(Nil)
private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
.doc("Whether to preserve temporary files created by the job in HDFS.")
@@ -183,7 +190,7 @@ package object config {
private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
.intConf
- .optional
+ .withDefault(1)
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
.bytesConf(ByteUnit.MiB)
@@ -191,6 +198,10 @@ package object config {
/* Executor configuration. */
+ private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
+ .intConf
+ .withDefault(1)
+
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.optional
@@ -245,5 +256,4 @@ package object config {
.stringConf
.toSequence
.optional
-
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9fc727904b..56dc0004d0 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -48,11 +48,10 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
- argsArrayBuf ++= getExtraClientArguments
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
- val args = new ClientArguments(argsArrayBuf.toArray, conf)
- totalExpectedExecutors = args.numExecutors
+ val args = new ClientArguments(argsArrayBuf.toArray)
+ totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
client = new Client(args, conf)
bindToYarn(client.submitApplication(), None)
@@ -73,43 +72,6 @@ private[spark] class YarnClientSchedulerBackend(
}
/**
- * Return any extra command line arguments to be passed to Client provided in the form of
- * environment variables or Spark properties.
- */
- private def getExtraClientArguments: Seq[String] = {
- val extraArgs = new ArrayBuffer[String]
- // List of (target Client argument, environment variable, Spark property)
- val optionTuples =
- List(
- ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
- ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
- ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
- ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
- ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
- ("--py-files", null, "spark.submit.pyFiles")
- )
- // Warn against the following deprecated environment variables: env var -> suggestion
- val deprecatedEnvVars = Map(
- "SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit",
- "SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit")
- optionTuples.foreach { case (optionName, envVar, sparkProp) =>
- if (sc.getConf.contains(sparkProp)) {
- extraArgs += (optionName, sc.getConf.get(sparkProp))
- } else if (envVar != null && System.getenv(envVar) != null) {
- extraArgs += (optionName, System.getenv(envVar))
- if (deprecatedEnvVars.contains(envVar)) {
- logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.")
- }
- }
- }
- // The app name is a special case because "spark.app.name" is required of all applications.
- // As a result, the corresponding "SPARK_YARN_APP_NAME" is already handled preemptively in
- // SparkSubmitArguments if "spark.app.name" is not explicitly set by the user. (SPARK-5222)
- sc.getConf.getOption("spark.app.name").foreach(v => extraArgs += ("--name", v))
- extraArgs
- }
-
- /**
* Report the state of the application until it is running.
* If the application has finished, failed or been killed in the process, throw an exception.
* This assumes both `client` and `appId` have already been set.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 64723c361c..2eaafa072a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -118,8 +118,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(SPARK_JARS, Seq(SPARK))
.set(USER_CLASS_PATH_FIRST, true)
+ .set("spark.yarn.dist.jars", ADDED)
val env = new MutableHashMap[String, String]()
- val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+ val args = new ClientArguments(Array("--jar", USER))
populateClasspath(args, conf, sparkConf, env)
@@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("Jar path propagation through SparkConf") {
- val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
- val client = createClient(sparkConf,
- args = Array("--jar", USER, "--addJars", ADDED))
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(SPARK_JARS, Seq(SPARK))
+ .set("spark.yarn.dist.jars", ADDED)
+ val client = createClient(sparkConf, args = Array("--jar", USER))
val tempDir = Utils.createTempDir()
try {
@@ -192,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
.set(MAX_APP_ATTEMPTS, 42)
- val args = new ClientArguments(Array(
- "--name", "foo-test-app",
- "--queue", "staging-queue"), sparkConf)
+ .set("spark.app.name", "foo-test-app")
+ .set(QUEUE_NAME, "staging-queue")
+ val args = new ClientArguments(Array())
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
sparkConf: SparkConf,
conf: Configuration = new Configuration(),
args: Array[String] = Array()): Client = {
- val clientArgs = new ClientArguments(args, sparkConf)
+ val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort())
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 0587444a33..a641a6e73e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
- "--executor-cores", "5",
- "--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
- sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
+ sparkConfClone
+ .set("spark.executor.instances", maxExecutors.toString)
+ .set("spark.executor.cores", "5")
+ .set("spark.executor.memory", "2048")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
@@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new ApplicationMasterArguments(args),
new SecurityManager(sparkConf))
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 26520529ec..b2b4d84f53 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testBasicYarnApp(false)
}
+ test("run Spark in yarn-client mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-cluster mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.driver.cores" -> "1",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-client mode with additional jar") {
+ testWithAddJar(true)
+ }
+
+ test("run Spark in yarn-cluster mode with additional jar") {
+ testWithAddJar(false)
+ }
+
test("run Spark in yarn-cluster mode unsuccessfully") {
// Don't provide arguments so the driver will fail.
val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
@@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
}
}
- private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
- appArgs = Seq(result.getAbsolutePath()))
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = conf)
checkResult(finalState, result)
}
+ private def testWithAddJar(clientMode: Boolean): Unit = {
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + originalJar.getPath()))
+ checkResult(finalState, driverResult, "ORIGINAL")
+ checkResult(finalState, executorResult, "ORIGINAL")
+ }
+
private def testPySpark(clientMode: Boolean): Unit = {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)