aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala157
1 files changed, 69 insertions, 88 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 82e66a3742..94e4bdbfb7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -18,18 +18,22 @@
package org.apache.spark.deploy
import java.net.URI
+import java.util.{List => JList}
import java.util.jar.JarFile
+import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.deploy.SparkSubmitAction._
+import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.util.Utils
/**
* Parses and encapsulates arguments from the spark-submit script.
* The env argument is used for testing.
*/
-private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) {
+private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
+ extends SparkSubmitArgumentsParser {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
@@ -84,7 +88,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
// Set parameters from command line arguments
- parseOpts(args.toList)
+ try {
+ parse(args.toList)
+ } catch {
+ case e: IllegalArgumentException =>
+ SparkSubmit.printErrorAndExit(e.getMessage())
+ }
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
@@ -277,167 +286,139 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
""".stripMargin
}
- /**
- * Fill in values by parsing user options.
- * NOTE: Any changes here must be reflected in YarnClientSchedulerBackend.
- */
- private def parseOpts(opts: Seq[String]): Unit = {
- val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
-
- // Delineates parsing of Spark options from parsing of user options.
- parse(opts)
-
- /**
- * NOTE: If you add or remove spark-submit options,
- * modify NOT ONLY this file but also utils.sh
- */
- def parse(opts: Seq[String]): Unit = opts match {
- case ("--name") :: value :: tail =>
+ /** Fill in values by parsing user options. */
+ override protected def handle(opt: String, value: String): Boolean = {
+ opt match {
+ case NAME =>
name = value
- parse(tail)
- case ("--master") :: value :: tail =>
+ case MASTER =>
master = value
- parse(tail)
- case ("--class") :: value :: tail =>
+ case CLASS =>
mainClass = value
- parse(tail)
- case ("--deploy-mode") :: value :: tail =>
+ case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
- parse(tail)
- case ("--num-executors") :: value :: tail =>
+ case NUM_EXECUTORS =>
numExecutors = value
- parse(tail)
- case ("--total-executor-cores") :: value :: tail =>
+ case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
- parse(tail)
- case ("--executor-cores") :: value :: tail =>
+ case EXECUTOR_CORES =>
executorCores = value
- parse(tail)
- case ("--executor-memory") :: value :: tail =>
+ case EXECUTOR_MEMORY =>
executorMemory = value
- parse(tail)
- case ("--driver-memory") :: value :: tail =>
+ case DRIVER_MEMORY =>
driverMemory = value
- parse(tail)
- case ("--driver-cores") :: value :: tail =>
+ case DRIVER_CORES =>
driverCores = value
- parse(tail)
- case ("--driver-class-path") :: value :: tail =>
+ case DRIVER_CLASS_PATH =>
driverExtraClassPath = value
- parse(tail)
- case ("--driver-java-options") :: value :: tail =>
+ case DRIVER_JAVA_OPTIONS =>
driverExtraJavaOptions = value
- parse(tail)
- case ("--driver-library-path") :: value :: tail =>
+ case DRIVER_LIBRARY_PATH =>
driverExtraLibraryPath = value
- parse(tail)
- case ("--properties-file") :: value :: tail =>
+ case PROPERTIES_FILE =>
propertiesFile = value
- parse(tail)
- case ("--kill") :: value :: tail =>
+ case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
}
action = KILL
- parse(tail)
- case ("--status") :: value :: tail =>
+ case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS
- parse(tail)
- case ("--supervise") :: tail =>
+ case SUPERVISE =>
supervise = true
- parse(tail)
- case ("--queue") :: value :: tail =>
+ case QUEUE =>
queue = value
- parse(tail)
- case ("--files") :: value :: tail =>
+ case FILES =>
files = Utils.resolveURIs(value)
- parse(tail)
- case ("--py-files") :: value :: tail =>
+ case PY_FILES =>
pyFiles = Utils.resolveURIs(value)
- parse(tail)
- case ("--archives") :: value :: tail =>
+ case ARCHIVES =>
archives = Utils.resolveURIs(value)
- parse(tail)
- case ("--jars") :: value :: tail =>
+ case JARS =>
jars = Utils.resolveURIs(value)
- parse(tail)
- case ("--packages") :: value :: tail =>
+ case PACKAGES =>
packages = value
- parse(tail)
- case ("--repositories") :: value :: tail =>
+ case REPOSITORIES =>
repositories = value
- parse(tail)
- case ("--conf" | "-c") :: value :: tail =>
+ case CONF =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
}
- parse(tail)
- case ("--proxy-user") :: value :: tail =>
+ case PROXY_USER =>
proxyUser = value
- parse(tail)
- case ("--help" | "-h") :: tail =>
+ case HELP =>
printUsageAndExit(0)
- case ("--verbose" | "-v") :: tail =>
+ case VERBOSE =>
verbose = true
- parse(tail)
- case ("--version") :: tail =>
+ case VERSION =>
SparkSubmit.printVersionAndExit()
- case EQ_SEPARATED_OPT(opt, value) :: tail =>
- parse(opt :: value :: tail)
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
+ }
+ true
+ }
- case value :: tail if value.startsWith("-") =>
- SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
+ /**
+ * Handle unrecognized command line options.
+ *
+ * The first unrecognized option is treated as the "primary resource". Everything else is
+ * treated as application arguments.
+ */
+ override protected def handleUnknown(opt: String): Boolean = {
+ if (opt.startsWith("-")) {
+ SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
+ }
- case value :: tail =>
- primaryResource =
- if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
- Utils.resolveURI(value).toString
- } else {
- value
- }
- isPython = SparkSubmit.isPython(value)
- childArgs ++= tail
+ primaryResource =
+ if (!SparkSubmit.isShell(opt) && !SparkSubmit.isInternal(opt)) {
+ Utils.resolveURI(opt).toString
+ } else {
+ opt
+ }
+ isPython = SparkSubmit.isPython(opt)
+ false
+ }
- case Nil =>
- }
+ override protected def handleExtraArgs(extra: JList[String]): Unit = {
+ childArgs ++= extra
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {