aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-03-29 14:41:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-29 14:41:36 -0700
commit1617816090e7b20124a512a43860a21232ebf511 (patch)
treecb6e45d21cb59edd81ab3bc29b9e00ab034bb90d /core
parent3738f24421d6f3bd10e5ef9ebfc10f702a5cb7ac (diff)
downloadspark-1617816090e7b20124a512a43860a21232ebf511.tar.gz
spark-1617816090e7b20124a512a43860a21232ebf511.tar.bz2
spark-1617816090e7b20124a512a43860a21232ebf511.zip
SPARK-1126. spark-app preliminary
This is a starting version of the spark-app script for running compiled binaries against Spark. It still needs tests and some polish. The only testing I've done so far has been using it to launch jobs in yarn-standalone mode against a pseudo-distributed cluster. This leaves out the changes required for launching python scripts. I think it might be best to save those for another JIRA/PR (while keeping to the design so that they won't require backwards-incompatible changes). Author: Sandy Ryza <sandy@cloudera.com> Closes #86 from sryza/sandy-spark-1126 and squashes the following commits: d428d85 [Sandy Ryza] Commenting, doc, and import fixes from Patrick's comments e7315c6 [Sandy Ryza] Fix failing tests 34de899 [Sandy Ryza] Change --more-jars to --jars and fix docs 299ddca [Sandy Ryza] Fix scalastyle a94c627 [Sandy Ryza] Add newline at end of SparkSubmit 04bc4e2 [Sandy Ryza] SPARK-1126. spark-submit script
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala212
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala176
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala121
3 files changed, 509 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
new file mode 100644
index 0000000000..24a9c98e18
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+import java.net.URL
+
+import org.apache.spark.executor.ExecutorURLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+/**
+ * Scala code behind the spark-submit script. The script handles setting up the classpath with
+ * relevant Spark dependencies and provides a layer over the different cluster managers and deploy
+ * modes that Spark supports.
+ */
+object SparkSubmit {
+ val YARN = 1
+ val STANDALONE = 2
+ val MESOS = 4
+ val LOCAL = 8
+ val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
+
+ var clusterManager: Int = LOCAL
+
+ def main(args: Array[String]) {
+ val appArgs = new SparkSubmitArguments(args)
+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ launch(childArgs, classpath, sysProps, mainClass)
+ }
+
+ /**
+ * @return
+ * a tuple containing the arguments for the child, a list of classpath
+ * entries for the child, and the main class for the child
+ */
+ def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+ ArrayBuffer[String], Map[String, String], String) = {
+ if (appArgs.master.startsWith("yarn")) {
+ clusterManager = YARN
+ } else if (appArgs.master.startsWith("spark")) {
+ clusterManager = STANDALONE
+ } else if (appArgs.master.startsWith("mesos")) {
+ clusterManager = MESOS
+ } else if (appArgs.master.startsWith("local")) {
+ clusterManager = LOCAL
+ } else {
+ System.err.println("master must start with yarn, mesos, spark, or local")
+ System.exit(1)
+ }
+
+ // Because "yarn-standalone" and "yarn-client" encapsulate both the master
+ // and deploy mode, we have some logic to infer the master and deploy mode
+ // from each other if only one is specified, or exit early if they are at odds.
+ if (appArgs.deployMode == null && appArgs.master == "yarn-standalone") {
+ appArgs.deployMode = "cluster"
+ }
+ if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
+ System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
+ System.exit(1)
+ }
+ if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
+ System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
+ System.exit(1)
+ }
+ if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
+ appArgs.master = "yarn-standalone"
+ }
+ if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
+ appArgs.master = "yarn-client"
+ }
+
+ val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster"
+
+ val childClasspath = new ArrayBuffer[String]()
+ val childArgs = new ArrayBuffer[String]()
+ val sysProps = new HashMap[String, String]()
+ var childMainClass = ""
+
+ if (clusterManager == MESOS && deployOnCluster) {
+ System.err.println("Mesos does not support running the driver on the cluster")
+ System.exit(1)
+ }
+
+ if (!deployOnCluster) {
+ childMainClass = appArgs.mainClass
+ childClasspath += appArgs.primaryResource
+ } else if (clusterManager == YARN) {
+ childMainClass = "org.apache.spark.deploy.yarn.Client"
+ childArgs += ("--jar", appArgs.primaryResource)
+ childArgs += ("--class", appArgs.mainClass)
+ }
+
+ val options = List[OptionAssigner](
+ new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+ new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
+ new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
+ new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
+ new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
+ new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
+ new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
+ new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
+ new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
+ sysProp = "spark.executor.memory"),
+ new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
+ new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
+ new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
+ new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
+ new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
+ sysProp = "spark.cores.max"),
+ new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
+ new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
+ new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
+ new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
+ new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars")
+ )
+
+ // more jars
+ if (appArgs.jars != null && !deployOnCluster) {
+ for (jar <- appArgs.jars.split(",")) {
+ childClasspath += jar
+ }
+ }
+
+ for (opt <- options) {
+ if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
+ (clusterManager & opt.clusterManager) != 0) {
+ if (opt.clOption != null) {
+ childArgs += (opt.clOption, opt.value)
+ } else if (opt.sysProp != null) {
+ sysProps.put(opt.sysProp, opt.value)
+ }
+ }
+ }
+
+ if (deployOnCluster && clusterManager == STANDALONE) {
+ if (appArgs.supervise) {
+ childArgs += "--supervise"
+ }
+
+ childMainClass = "org.apache.spark.deploy.Client"
+ childArgs += "launch"
+ childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
+ }
+
+ // args
+ if (appArgs.childArgs != null) {
+ if (!deployOnCluster || clusterManager == STANDALONE) {
+ childArgs ++= appArgs.childArgs
+ } else if (clusterManager == YARN) {
+ for (arg <- appArgs.childArgs) {
+ childArgs += ("--args", arg)
+ }
+ }
+ }
+
+ (childArgs, childClasspath, sysProps, childMainClass)
+ }
+
+ def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
+ sysProps: Map[String, String], childMainClass: String) {
+ val loader = new ExecutorURLClassLoader(new Array[URL](0),
+ Thread.currentThread.getContextClassLoader)
+ Thread.currentThread.setContextClassLoader(loader)
+
+ for (jar <- childClasspath) {
+ addJarToClasspath(jar, loader)
+ }
+
+ for ((key, value) <- sysProps) {
+ System.setProperty(key, value)
+ }
+
+ val mainClass = Class.forName(childMainClass, true, loader)
+ val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
+ mainMethod.invoke(null, childArgs.toArray)
+ }
+
+ def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
+ val localJarFile = new File(localJar)
+ if (!localJarFile.exists()) {
+ System.err.println("Jar does not exist: " + localJar + ". Skipping.")
+ }
+
+ val url = localJarFile.getAbsoluteFile.toURI.toURL
+ loader.addURL(url)
+ }
+}
+
+private[spark] class OptionAssigner(val value: String,
+ val clusterManager: Int,
+ val deployOnCluster: Boolean,
+ val clOption: String = null,
+ val sysProp: String = null
+) { }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
new file mode 100644
index 0000000000..ff2aa68908
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Parses and encapsulates arguments from the spark-submit script.
+ */
+private[spark] class SparkSubmitArguments(args: Array[String]) {
+ var master: String = "local"
+ var deployMode: String = null
+ var executorMemory: String = null
+ var executorCores: String = null
+ var totalExecutorCores: String = null
+ var driverMemory: String = null
+ var driverCores: String = null
+ var supervise: Boolean = false
+ var queue: String = null
+ var numExecutors: String = null
+ var files: String = null
+ var archives: String = null
+ var mainClass: String = null
+ var primaryResource: String = null
+ var name: String = null
+ var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
+ var jars: String = null
+
+ loadEnvVars()
+ parseArgs(args.toList)
+
+ def loadEnvVars() {
+ master = System.getenv("MASTER")
+ deployMode = System.getenv("DEPLOY_MODE")
+ }
+
+ def parseArgs(args: List[String]) {
+ if (args.size == 0) {
+ printUsageAndExit(1)
+ System.exit(1)
+ }
+ primaryResource = args(0)
+ parseOpts(args.tail)
+ }
+
+ def parseOpts(opts: List[String]): Unit = opts match {
+ case ("--name") :: value :: tail =>
+ name = value
+ parseOpts(tail)
+
+ case ("--master") :: value :: tail =>
+ master = value
+ parseOpts(tail)
+
+ case ("--class") :: value :: tail =>
+ mainClass = value
+ parseOpts(tail)
+
+ case ("--deploy-mode") :: value :: tail =>
+ if (value != "client" && value != "cluster") {
+ System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
+ System.exit(1)
+ }
+ deployMode = value
+ parseOpts(tail)
+
+ case ("--num-executors") :: value :: tail =>
+ numExecutors = value
+ parseOpts(tail)
+
+ case ("--total-executor-cores") :: value :: tail =>
+ totalExecutorCores = value
+ parseOpts(tail)
+
+ case ("--executor-cores") :: value :: tail =>
+ executorCores = value
+ parseOpts(tail)
+
+ case ("--executor-memory") :: value :: tail =>
+ executorMemory = value
+ parseOpts(tail)
+
+ case ("--driver-memory") :: value :: tail =>
+ driverMemory = value
+ parseOpts(tail)
+
+ case ("--driver-cores") :: value :: tail =>
+ driverCores = value
+ parseOpts(tail)
+
+ case ("--supervise") :: tail =>
+ supervise = true
+ parseOpts(tail)
+
+ case ("--queue") :: value :: tail =>
+ queue = value
+ parseOpts(tail)
+
+ case ("--files") :: value :: tail =>
+ files = value
+ parseOpts(tail)
+
+ case ("--archives") :: value :: tail =>
+ archives = value
+ parseOpts(tail)
+
+ case ("--arg") :: value :: tail =>
+ childArgs += value
+ parseOpts(tail)
+
+ case ("--jars") :: value :: tail =>
+ jars = value
+ parseOpts(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil =>
+
+ case _ =>
+ printUsageAndExit(1, opts)
+ }
+
+ def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ if (unknownParam != null) {
+ System.err.println("Unknown/unsupported param " + unknownParam)
+ }
+ System.err.println(
+ """Usage: spark-submit <primary binary> [options]
+ |Options:
+ | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
+ | --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
+ | --class CLASS_NAME Name of your app's main class (required for Java apps).
+ | --arg ARG Argument to be passed to your application's main class. This
+ | option can be specified multiple times for multiple args.
+ | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
+ | --name NAME The name of your application (Default: 'Spark').
+ | --jars JARS A comma-separated list of local jars to include on the
+ | driver classpath and that SparkContext.addJar will work
+ | with. Doesn't work on standalone with 'cluster' deploy mode.
+ |
+ | Spark standalone with cluster deploy mode only:
+ | --driver-cores NUM Cores for driver (Default: 1).
+ | --supervise If given, restarts the driver on failure.
+ |
+ | Spark standalone and Mesos only:
+ | --total-executor-cores NUM Total cores for all executors.
+ |
+ | YARN-only:
+ | --executor-cores NUM Number of cores per executor (Default: 1).
+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
+ | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
+ | --num-executors NUM Number of executors to (Default: 2).
+ | --files FILES Comma separated list of files to be placed in the working dir
+ | of each executor.
+ | --archives ARCHIVES Comma separated list of archives to be extracted into the
+ | working dir of each executor.""".stripMargin
+ )
+ System.exit(exitCode)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
new file mode 100644
index 0000000000..29fef2ed8c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.deploy.SparkSubmit._
+
+class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+ test("prints usage on empty input") {
+ val clArgs = Array[String]()
+ // val appArgs = new SparkSubmitArguments(clArgs)
+ }
+
+ test("handles YARN cluster mode") {
+ val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
+ "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
+ "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
+ "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
+ "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+ "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val childArgsStr = childArgs.mkString(" ")
+ childArgsStr should include ("--jar thejar.jar")
+ childArgsStr should include ("--class org.SomeClass")
+ childArgsStr should include ("--addJars one.jar,two.jar,three.jar")
+ childArgsStr should include ("--executor-memory 5g")
+ childArgsStr should include ("--driver-memory 4g")
+ childArgsStr should include ("--executor-cores 5")
+ childArgsStr should include ("--args arg1 --args arg2")
+ childArgsStr should include ("--queue thequeue")
+ childArgsStr should include ("--files file1.txt,file2.txt")
+ childArgsStr should include ("--archives archive1.txt,archive2.txt")
+ childArgsStr should include ("--num-executors 6")
+ mainClass should be ("org.apache.spark.deploy.yarn.Client")
+ classpath should have length (0)
+ sysProps should have size (0)
+ }
+
+ test("handles YARN client mode") {
+ val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+ "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
+ "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
+ "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
+ "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+ "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ childArgs.mkString(" ") should be ("arg1 arg2")
+ mainClass should be ("org.SomeClass")
+ classpath should contain ("thejar.jar")
+ classpath should contain ("one.jar")
+ classpath should contain ("two.jar")
+ classpath should contain ("three.jar")
+ sysProps("spark.executor.memory") should be ("5g")
+ sysProps("spark.executor.cores") should be ("5")
+ sysProps("spark.yarn.queue") should be ("thequeue")
+ sysProps("spark.yarn.dist.files") should be ("file1.txt,file2.txt")
+ sysProps("spark.yarn.dist.archives") should be ("archive1.txt,archive2.txt")
+ sysProps("spark.executor.instances") should be ("6")
+ }
+
+ test("handles standalone cluster mode") {
+ val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
+ "--master", "spark://h:p", "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
+ "--supervise", "--driver-memory", "4g", "--driver-cores", "5")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ val childArgsStr = childArgs.mkString(" ")
+ print("child args: " + childArgsStr)
+ childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
+ childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
+ mainClass should be ("org.apache.spark.deploy.Client")
+ classpath should have length (0)
+ sysProps should have size (0)
+ }
+
+ test("handles standalone client mode") {
+ val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+ "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
+ "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
+ "--driver-memory", "4g")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ childArgs.mkString(" ") should be ("arg1 arg2")
+ mainClass should be ("org.SomeClass")
+ classpath should contain ("thejar.jar")
+ sysProps("spark.executor.memory") should be ("5g")
+ sysProps("spark.cores.max") should be ("5")
+ }
+
+ test("handles mesos client mode") {
+ val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+ "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
+ "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
+ "--driver-memory", "4g")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+ childArgs.mkString(" ") should be ("arg1 arg2")
+ mainClass should be ("org.SomeClass")
+ classpath should contain ("thejar.jar")
+ sysProps("spark.executor.memory") should be ("5g")
+ sysProps("spark.cores.max") should be ("5")
+ }
+}