aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala289
1 files changed, 145 insertions, 144 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b050dccb6d..3d8373d817 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -27,25 +27,39 @@ import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
/**
- * Scala code behind the spark-submit script. The script handles setting up the classpath with
- * relevant Spark dependencies and provides a layer over the different cluster managers and deploy
- * modes that Spark supports.
+ * Main gateway of launching a Spark application.
+ *
+ * This program handles setting up the classpath with relevant Spark dependencies and provides
+ * a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit {
+
+ // Cluster managers
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
- private var clusterManager: Int = LOCAL
+ // Deploy modes
+ private val CLIENT = 1
+ private val CLUSTER = 2
+ private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
- /**
- * Special primary resource names that represent shells rather than application jars.
- */
+ // Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
+ // Exposed for testing
+ private[spark] var exitFn: () => Unit = () => System.exit(-1)
+ private[spark] var printStream: PrintStream = System.err
+ private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
+ private[spark] def printErrorAndExit(str: String) = {
+ printStream.println("Error: " + str)
+ printStream.println("Run with --help for usage help or --verbose for debug output")
+ exitFn()
+ }
+
def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
@@ -55,88 +69,80 @@ object SparkSubmit {
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}
- // Exposed for testing
- private[spark] var printStream: PrintStream = System.err
- private[spark] var exitFn: () => Unit = () => System.exit(-1)
-
- private[spark] def printErrorAndExit(str: String) = {
- printStream.println("Error: " + str)
- printStream.println("Run with --help for usage help or --verbose for debug output")
- exitFn()
- }
- private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
-
/**
- * @return a tuple containing the arguments for the child, a list of classpath
- * entries for the child, a list of system properties, a list of env vars
- * and the main class for the child
+ * @return a tuple containing
+ * (1) the arguments for the child process,
+ * (2) a list of classpath entries for the child,
+ * (3) a list of system properties and env vars, and
+ * (4) the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
- if (args.master.startsWith("local")) {
- clusterManager = LOCAL
- } else if (args.master.startsWith("yarn")) {
- clusterManager = YARN
- } else if (args.master.startsWith("spark")) {
- clusterManager = STANDALONE
- } else if (args.master.startsWith("mesos")) {
- clusterManager = MESOS
- } else {
- printErrorAndExit("Master must start with yarn, mesos, spark, or local")
- }
-
- // Because "yarn-cluster" and "yarn-client" encapsulate both the master
- // and deploy mode, we have some logic to infer the master and deploy mode
- // from each other if only one is specified, or exit early if they are at odds.
- if (args.deployMode == null &&
- (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
- args.deployMode = "cluster"
- }
- if (args.deployMode == "cluster" && args.master == "yarn-client") {
- printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
- }
- if (args.deployMode == "client" &&
- (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
- printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
- + "\" are not compatible")
- }
- if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
- args.master = "yarn-cluster"
- }
- if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
- args.master = "yarn-client"
- }
-
- val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"
- val childClasspath = new ArrayBuffer[String]()
+ // Values to return
val childArgs = new ArrayBuffer[String]()
+ val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""
- val isPython = args.isPython
- val isYarnCluster = clusterManager == YARN && deployOnCluster
+ // Set the cluster manager
+ val clusterManager: Int = args.master match {
+ case m if m.startsWith("yarn") => YARN
+ case m if m.startsWith("spark") => STANDALONE
+ case m if m.startsWith("mesos") => MESOS
+ case m if m.startsWith("local") => LOCAL
+ case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
+ }
- // For mesos, only client mode is supported
- if (clusterManager == MESOS && deployOnCluster) {
- printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+ // Set the deploy mode; default is client mode
+ var deployMode: Int = args.deployMode match {
+ case "client" | null => CLIENT
+ case "cluster" => CLUSTER
+ case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
- // For standalone, only client mode is supported
- if (clusterManager == STANDALONE && deployOnCluster) {
- printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.")
+ // Because "yarn-cluster" and "yarn-client" encapsulate both the master
+ // and deploy mode, we have some logic to infer the master and deploy mode
+ // from each other if only one is specified, or exit early if they are at odds.
+ if (clusterManager == YARN) {
+ if (args.master == "yarn-standalone") {
+ printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
+ args.master = "yarn-cluster"
+ }
+ (args.master, args.deployMode) match {
+ case ("yarn-cluster", null) =>
+ deployMode = CLUSTER
+ case ("yarn-cluster", "client") =>
+ printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
+ case ("yarn-client", "cluster") =>
+ printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
+ case (_, mode) =>
+ args.master = "yarn-" + Option(mode).getOrElse("client")
+ }
+
+ // Make sure YARN is included in our build if we're trying to use it
+ if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
+ printErrorAndExit(
+ "Could not load YARN classes. " +
+ "This copy of Spark may not have been compiled with YARN support.")
+ }
}
- // For shells, only client mode is applicable
- if (isShell(args.primaryResource) && deployOnCluster) {
- printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
+ // The following modes are not supported or applicable
+ (clusterManager, deployMode) match {
+ case (MESOS, CLUSTER) =>
+ printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+ case (STANDALONE, CLUSTER) =>
+ printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
+ case (_, CLUSTER) if args.isPython =>
+ printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
+ case (_, CLUSTER) if isShell(args.primaryResource) =>
+ printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
+ case _ =>
}
// If we're running a python app, set the main class to our specific python runner
- if (isPython) {
- if (deployOnCluster) {
- printErrorAndExit("Cluster deploy mode is currently not supported for python.")
- }
+ if (args.isPython) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -152,120 +158,115 @@ object SparkSubmit {
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
}
- // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
- if (!deployOnCluster) {
- childMainClass = args.mainClass
- if (isUserJar(args.primaryResource)) {
- childClasspath += args.primaryResource
- }
- } else if (clusterManager == YARN) {
- childMainClass = "org.apache.spark.deploy.yarn.Client"
- childArgs += ("--jar", args.primaryResource)
- childArgs += ("--class", args.mainClass)
- }
-
- // Make sure YARN is included in our build if we're trying to use it
- if (clusterManager == YARN) {
- if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
- printErrorAndExit("Could not load YARN classes. " +
- "This copy of Spark may not have been compiled with YARN support.")
- }
- }
-
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
- OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
- OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
- OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
- OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
+
+ // All cluster managers
+ OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
+ OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
+ OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
+
+ // Standalone cluster only
+ OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
+ OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
+
+ // Yarn client only
+ OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
+ OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
+ OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
+ OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
+ OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
+
+ // Yarn cluster only
+ OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
+ OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
+ OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
+ OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
+ OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
+ OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
+ OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
+ OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
+ OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
+
+ // Other options
+ OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraClassPath"),
- OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
+ OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraJavaOptions"),
- OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
+ OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
- OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
- OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
- OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
- OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
- OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
- OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
- OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
- OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
- OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
+ OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
sysProp = "spark.executor.memory"),
- OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
- OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
- OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
+ OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
sysProp = "spark.cores.max"),
- OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
- OptionAssigner(args.files, YARN, true, clOption = "--files"),
- OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
- OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
- OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
- OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
- OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
- OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
+ sysProp = "spark.files")
)
- // For client mode make any added jars immediately visible on the classpath
- if (args.jars != null && !deployOnCluster) {
- for (jar <- args.jars.split(",")) {
- childClasspath += jar
+ // In client mode, launch the application main class directly
+ // In addition, add the main application jar and any added jars (if any) to the classpath
+ if (deployMode == CLIENT) {
+ childMainClass = args.mainClass
+ if (isUserJar(args.primaryResource)) {
+ childClasspath += args.primaryResource
}
+ if (args.jars != null) { childClasspath ++= args.jars.split(",") }
+ if (args.childArgs != null) { childArgs ++= args.childArgs }
}
+
// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
- if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
+ if (opt.value != null &&
+ (deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
- if (opt.clOption != null) {
- childArgs += (opt.clOption, opt.value)
- }
- if (opt.sysProp != null) {
- sysProps.put(opt.sysProp, opt.value)
- }
+ if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
+ if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
}
}
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
- if (!isYarnCluster && !isPython) {
- var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+ val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+ if (!isYarnCluster && !args.isPython) {
+ var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}
- // Standalone cluster specific configurations
- if (deployOnCluster && clusterManager == STANDALONE) {
+ // In standalone-cluster mode, use Client as a wrapper around the user class
+ if (clusterManager == STANDALONE && deployMode == CLUSTER) {
+ childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
}
- childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
+ if (args.childArgs != null) {
+ childArgs ++= args.childArgs
+ }
}
- // Arguments to be passed to user program
- if (args.childArgs != null) {
- if (!deployOnCluster || clusterManager == STANDALONE) {
- childArgs ++= args.childArgs
- } else if (clusterManager == YARN) {
- for (arg <- args.childArgs) {
- childArgs += ("--arg", arg)
- }
+ // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
+ if (clusterManager == YARN && deployMode == CLUSTER) {
+ childMainClass = "org.apache.spark.deploy.yarn.Client"
+ childArgs += ("--jar", args.primaryResource)
+ childArgs += ("--class", args.mainClass)
+ if (args.childArgs != null) {
+ args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
- if (!sysProps.contains(k)) sysProps(k) = v
+ sysProps.getOrElseUpdate(k, v)
}
(childArgs, childClasspath, sysProps, childMainClass)
@@ -364,6 +365,6 @@ object SparkSubmit {
private[spark] case class OptionAssigner(
value: String,
clusterManager: Int,
- deployOnCluster: Boolean,
+ deployMode: Int,
clOption: String = null,
sysProp: String = null)