diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 44 |
1 files changed, 17 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4049fc0c41..926e1ff7a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -441,7 +441,6 @@ object SparkSubmit { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), @@ -452,27 +451,15 @@ object SparkSubmit { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), - // Yarn client only - OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + // Yarn only + OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), - OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"), - OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"), - - // Yarn cluster only - OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), - OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), - OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), - OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), - OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), - OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), - OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), - OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), - OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), - OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"), - OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"), + OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), + OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), + OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"), // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, @@ -483,10 +470,11 @@ object SparkSubmit { sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files"), - OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"), - OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER, + OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), + OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.memory"), - OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER, + OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), @@ -550,6 +538,10 @@ object SparkSubmit { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } + + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles + } } // assure a keytab is available from any place in a JVM @@ -576,9 +568,6 @@ object SparkSubmit { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) - if (args.pyFiles != null) { - childArgs += ("--py-files", args.pyFiles) - } childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName @@ -627,7 +616,8 @@ object SparkSubmit { "spark.jars", "spark.files", "spark.yarn.dist.files", - "spark.yarn.dist.archives") + "spark.yarn.dist.archives", + "spark.yarn.dist.jars") pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist sysProps.get(config).foreach { oldValue => |