diff options
author | jerryshao <sshao@hortonworks.com> | 2016-04-01 10:52:13 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-04-01 10:52:13 -0700 |
commit | 8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e (patch) | |
tree | bbe9310838ffdf087dc628d4c9993ba58c932ce1 /core/src/main | |
parent | 58e6bc827f1f9dc1afee07dca1bee1f56553dd20 (diff) | |
download | spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.gz spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.bz2 spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.zip |
[SPARK-12343][YARN] Simplify Yarn client and client argument
## What changes were proposed in this pull request?
Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments.
## How was this patch tested?
This patch is tested manually with unit test.
CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set.
Author: jerryshao <sshao@hortonworks.com>
Closes #11603 from jerryshao/SPARK-12343.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 44 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/internal/config/package.scala | 14 |
2 files changed, 31 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4049fc0c41..926e1ff7a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -441,7 +441,6 @@ object SparkSubmit { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), @@ -452,27 +451,15 @@ object SparkSubmit { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), - // Yarn client only - OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + // Yarn only + OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), - OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"), - OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"), - - // Yarn cluster only - OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), - OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), - OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), - OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), - OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), - OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), - OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), - OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), - OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), - OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"), - OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"), + OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), + OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), + OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"), // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, @@ -483,10 +470,11 @@ object SparkSubmit { sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files"), - OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"), - OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER, + OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), + OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.memory"), - OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER, + OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), @@ -550,6 +538,10 @@ object SparkSubmit { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } + + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles + } } // assure a keytab is available from any place in a JVM @@ -576,9 +568,6 @@ object SparkSubmit { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) - if (args.pyFiles != null) { - childArgs += ("--py-files", args.pyFiles) - } childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName @@ -627,7 +616,8 @@ object SparkSubmit { "spark.jars", "spark.files", "spark.yarn.dist.files", - "spark.yarn.dist.archives") + "spark.yarn.dist.archives", + "spark.yarn.dist.jars") pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist sysProps.get(config).foreach { oldValue => diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f2f20b3207..968c5192ac 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -18,6 +18,7 @@ package org.apache.spark.internal import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.ByteUnit package object config { @@ -33,6 +34,10 @@ package object config { private[spark] val DRIVER_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false) + private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") + .bytesConf(ByteUnit.MiB) + .withDefaultString("1g") + private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional @@ -45,6 +50,10 @@ package object config { private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false) + private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") + .bytesConf(ByteUnit.MiB) + .withDefaultString("1g") + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal .booleanConf.withDefault(false) @@ -73,4 +82,9 @@ package object config { private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional + private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") + .internal + .stringConf + .toSequence + .withDefault(Nil) } |