aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-01 10:52:13 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-01 10:52:13 -0700
commit8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e (patch)
treebbe9310838ffdf087dc628d4c9993ba58c932ce1 /core/src/main/scala/org/apache
parent58e6bc827f1f9dc1afee07dca1bee1f56553dd20 (diff)
downloadspark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.gz
spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.bz2
spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.zip
[SPARK-12343][YARN] Simplify Yarn client and client argument
## What changes were proposed in this pull request? Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments. ## How was this patch tested? This patch is tested manually with unit test. CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set. Author: jerryshao <sshao@hortonworks.com> Closes #11603 from jerryshao/SPARK-12343.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala14
2 files changed, 31 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4049fc0c41..926e1ff7a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -441,7 +441,6 @@ object SparkSubmit {
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
- OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
@@ -452,27 +451,15 @@ object SparkSubmit {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),
- // Yarn client only
- OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
+ // Yarn only
+ OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
- OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
- OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
- OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
- OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
-
- // Yarn cluster only
- OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
- OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
- OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
- OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
- OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
- OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
- OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
- OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
- OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
- OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"),
- OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"),
+ OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
+ OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
+ OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
+ OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
+ OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
@@ -483,10 +470,11 @@ object SparkSubmit {
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
- OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
- OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
+ OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
+ OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
+ OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.memory"),
- OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
+ OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
@@ -550,6 +538,10 @@ object SparkSubmit {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}
+
+ if (args.pyFiles != null) {
+ sysProps("spark.submit.pyFiles") = args.pyFiles
+ }
}
// assure a keytab is available from any place in a JVM
@@ -576,9 +568,6 @@ object SparkSubmit {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
- if (args.pyFiles != null) {
- childArgs += ("--py-files", args.pyFiles)
- }
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
@@ -627,7 +616,8 @@ object SparkSubmit {
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
- "spark.yarn.dist.archives")
+ "spark.yarn.dist.archives",
+ "spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sysProps.get(config).foreach { oldValue =>
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f2f20b3207..968c5192ac 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -18,6 +18,7 @@
package org.apache.spark.internal
import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.ByteUnit
package object config {
@@ -33,6 +34,10 @@ package object config {
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+ private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
+ .bytesConf(ByteUnit.MiB)
+ .withDefaultString("1g")
+
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
@@ -45,6 +50,10 @@ package object config {
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+ private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
+ .bytesConf(ByteUnit.MiB)
+ .withDefaultString("1g")
+
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
.booleanConf.withDefault(false)
@@ -73,4 +82,9 @@ package object config {
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+ private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
+ .internal
+ .stringConf
+ .toSequence
+ .withDefault(Nil)
}