aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-26 19:24:29 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-26 19:24:29 -0700
commitaa9a7f5db7bffcdbcd7fd53694c606d3a8cdd21f (patch)
tree2dd20bd11dd1673677c2b52a9aa3b5774b4f3c45
parent762af4e9c2837c8ca125838b2ca01e328ee6669f (diff)
downloadspark-aa9a7f5db7bffcdbcd7fd53694c606d3a8cdd21f.tar.gz
spark-aa9a7f5db7bffcdbcd7fd53694c606d3a8cdd21f.tar.bz2
spark-aa9a7f5db7bffcdbcd7fd53694c606d3a8cdd21f.zip
SPARK-1606: Infer user application arguments instead of requiring --arg.
This modifies spark-submit to do something more like the Hadoop `jar` command. Now we have the following syntax: ./bin/spark-submit [options] user.jar [user options] Author: Patrick Wendell <pwendell@gmail.com> Closes #563 from pwendell/spark-submit and squashes the following commits: 32241fc [Patrick Wendell] Review feedback 3adfb69 [Patrick Wendell] Small fix bc48139 [Patrick Wendell] SPARK-1606: Infer user application arguments instead of requiring --arg.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala226
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala77
-rw-r--r--docs/cluster-overview.md22
-rw-r--r--docs/quick-start.md10
6 files changed, 181 insertions, 162 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 58aa6d951a..24edc60684 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -185,7 +185,6 @@ object SparkSubmit {
if (clusterManager == STANDALONE) {
val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
- println("SPARK JARS" + sysProps.get("spark.jars"))
}
if (deployOnCluster && clusterManager == STANDALONE) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c545b093ac..58d9e9add7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -21,14 +21,15 @@ import java.io.{File, FileInputStream, IOException}
import java.util.Properties
import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ArrayBuffer}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.SparkException
+import org.apache.spark.util.Utils
/**
* Parses and encapsulates arguments from the spark-submit script.
*/
-private[spark] class SparkSubmitArguments(args: Array[String]) {
+private[spark] class SparkSubmitArguments(args: Seq[String]) {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
@@ -118,8 +119,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
- val testing = sys.env.contains("SPARK_TESTING")
- if (!hasHadoopEnv && !testing) {
+ if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
@@ -156,119 +156,121 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
""".stripMargin
}
- private def parseOpts(opts: List[String]): Unit = opts match {
- case ("--name") :: value :: tail =>
- name = value
- parseOpts(tail)
+ /** Fill in values by parsing user options. */
+ private def parseOpts(opts: Seq[String]): Unit = {
+ // Delineates parsing of Spark options from parsing of user options.
+ var inSparkOpts = true
+ parse(opts)
- case ("--master") :: value :: tail =>
- master = value
- parseOpts(tail)
+ def parse(opts: Seq[String]): Unit = opts match {
+ case ("--name") :: value :: tail =>
+ name = value
+ parse(tail)
- case ("--class") :: value :: tail =>
- mainClass = value
- parseOpts(tail)
+ case ("--master") :: value :: tail =>
+ master = value
+ parse(tail)
- case ("--deploy-mode") :: value :: tail =>
- if (value != "client" && value != "cluster") {
- SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
- }
- deployMode = value
- parseOpts(tail)
-
- case ("--num-executors") :: value :: tail =>
- numExecutors = value
- parseOpts(tail)
-
- case ("--total-executor-cores") :: value :: tail =>
- totalExecutorCores = value
- parseOpts(tail)
-
- case ("--executor-cores") :: value :: tail =>
- executorCores = value
- parseOpts(tail)
-
- case ("--executor-memory") :: value :: tail =>
- executorMemory = value
- parseOpts(tail)
-
- case ("--driver-memory") :: value :: tail =>
- driverMemory = value
- parseOpts(tail)
-
- case ("--driver-cores") :: value :: tail =>
- driverCores = value
- parseOpts(tail)
-
- case ("--driver-class-path") :: value :: tail =>
- driverExtraClassPath = value
- parseOpts(tail)
-
- case ("--driver-java-options") :: value :: tail =>
- driverExtraJavaOptions = value
- parseOpts(tail)
-
- case ("--driver-library-path") :: value :: tail =>
- driverExtraLibraryPath = value
- parseOpts(tail)
-
- case ("--properties-file") :: value :: tail =>
- propertiesFile = value
- parseOpts(tail)
-
- case ("--supervise") :: tail =>
- supervise = true
- parseOpts(tail)
-
- case ("--queue") :: value :: tail =>
- queue = value
- parseOpts(tail)
-
- case ("--files") :: value :: tail =>
- files = value
- parseOpts(tail)
-
- case ("--archives") :: value :: tail =>
- archives = value
- parseOpts(tail)
-
- case ("--arg") :: value :: tail =>
- childArgs += value
- parseOpts(tail)
-
- case ("--jars") :: value :: tail =>
- jars = value
- parseOpts(tail)
-
- case ("--help" | "-h") :: tail =>
- printUsageAndExit(0)
-
- case ("--verbose" | "-v") :: tail =>
- verbose = true
- parseOpts(tail)
-
- case value :: tail =>
- if (value.startsWith("-")) {
- val errMessage = s"Unrecognized option '$value'."
- val suggestion: Option[String] = value match {
- case v if v.startsWith("--") && v.contains("=") =>
- val parts = v.split("=")
- Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
- case _ =>
- None
+ case ("--class") :: value :: tail =>
+ mainClass = value
+ parse(tail)
+
+ case ("--deploy-mode") :: value :: tail =>
+ if (value != "client" && value != "cluster") {
+ SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
+ }
+ deployMode = value
+ parse(tail)
+
+ case ("--num-executors") :: value :: tail =>
+ numExecutors = value
+ parse(tail)
+
+ case ("--total-executor-cores") :: value :: tail =>
+ totalExecutorCores = value
+ parse(tail)
+
+ case ("--executor-cores") :: value :: tail =>
+ executorCores = value
+ parse(tail)
+
+ case ("--executor-memory") :: value :: tail =>
+ executorMemory = value
+ parse(tail)
+
+ case ("--driver-memory") :: value :: tail =>
+ driverMemory = value
+ parse(tail)
+
+ case ("--driver-cores") :: value :: tail =>
+ driverCores = value
+ parse(tail)
+
+ case ("--driver-class-path") :: value :: tail =>
+ driverExtraClassPath = value
+ parse(tail)
+
+ case ("--driver-java-options") :: value :: tail =>
+ driverExtraJavaOptions = value
+ parse(tail)
+
+ case ("--driver-library-path") :: value :: tail =>
+ driverExtraLibraryPath = value
+ parse(tail)
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parse(tail)
+
+ case ("--supervise") :: tail =>
+ supervise = true
+ parse(tail)
+
+ case ("--queue") :: value :: tail =>
+ queue = value
+ parse(tail)
+
+ case ("--files") :: value :: tail =>
+ files = value
+ parse(tail)
+
+ case ("--archives") :: value :: tail =>
+ archives = value
+ parse(tail)
+
+ case ("--jars") :: value :: tail =>
+ jars = value
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case ("--verbose" | "-v") :: tail =>
+ verbose = true
+ parse(tail)
+
+ case value :: tail =>
+ if (inSparkOpts) {
+ value match {
+ // convert --foo=bar to --foo bar
+ case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
+ val parts = v.split("=")
+ parse(Seq(parts(0), parts(1)) ++ tail)
+ case v if v.startsWith("-") =>
+ val errMessage = s"Unrecognized option '$value'."
+ SparkSubmit.printErrorAndExit(errMessage)
+ case v =>
+ primaryResource = v
+ inSparkOpts = false
+ parse(tail)
+ }
+ } else {
+ childArgs += value
+ parse(tail)
}
- SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
- }
- if (primaryResource != null) {
- val error = s"Found two conflicting resources, $value and $primaryResource." +
- " Expecting only one resource."
- SparkSubmit.printErrorAndExit(error)
+ case Nil =>
}
- primaryResource = value
- parseOpts(tail)
-
- case Nil =>
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
@@ -277,7 +279,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
- """Usage: spark-submit <app jar> [options]
+ """Usage: spark-submit [options] <app jar> [app options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8351f7156a..5a55e7df34 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1056,4 +1056,11 @@ private[spark] object Utils extends Logging {
def getHadoopFileSystem(path: String): FileSystem = {
getHadoopFileSystem(new URI(path))
}
+
+ /**
+ * Indicates whether Spark is currently running unit tests.
+ */
+ private[spark] def isTesting = {
+ sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 657b44668d..10a65c75cc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -28,6 +28,9 @@ import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+ def beforeAll() {
+ System.setProperty("spark.testing", "true")
+ }
val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
@@ -74,33 +77,35 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
testPrematureExit(Array("--help"), "Usage: spark-submit")
}
- test("prints error with unrecognized option") {
+ test("prints error with unrecognized options") {
testPrematureExit(Array("--blarg"), "Unrecognized option '--blarg'")
testPrematureExit(Array("-bleg"), "Unrecognized option '-bleg'")
- testPrematureExit(Array("--master=abc"),
- "Unrecognized option '--master=abc'. Perhaps you want '--master abc'?")
}
- test("handles multiple binary definitions") {
- val adjacentJars = Array("foo.jar", "bar.jar")
- testPrematureExit(adjacentJars, "error: Found two conflicting resources")
+ test("handle binary specified but not class") {
+ testPrematureExit(Array("foo.jar"), "Must specify a main class")
+ }
- val nonAdjacentJars =
- Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
- testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources")
+ test("handles arguments with --key=val") {
+ val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ appArgs.jars should be ("one.jar,two.jar,three.jar")
+ appArgs.name should be ("myApp")
}
- test("handle binary specified but not class") {
- testPrematureExit(Array("foo.jar"), "Must specify a main class")
+ test("handles arguments to user program") {
+ val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ appArgs.childArgs should be (Seq("some", "--random", "args", "here"))
}
test("handles YARN cluster mode") {
- val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
+ val clArgs = Seq("--deploy-mode", "cluster",
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
- "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
- "--queue", "thequeue", "--files", "file1.txt,file2.txt",
- "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+ "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+ "--archives", "archive1.txt,archive2.txt", "--num-executors", "6",
+ "thejar.jar", "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
@@ -121,12 +126,12 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles YARN client mode") {
- val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+ val clArgs = Seq("--deploy-mode", "client",
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
- "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
- "--queue", "thequeue", "--files", "file1.txt,file2.txt",
- "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+ "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+ "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar",
+ "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -144,9 +149,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles standalone cluster mode") {
- val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
- "--master", "spark://h:p", "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
- "--supervise", "--driver-memory", "4g", "--driver-cores", "5")
+ val clArgs = Seq("--deploy-mode", "cluster",
+ "--master", "spark://h:p", "--class", "org.SomeClass",
+ "--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
@@ -158,10 +163,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles standalone client mode") {
- val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+ val clArgs = Seq("--deploy-mode", "client",
"--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
- "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
- "--driver-memory", "4g")
+ "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -172,10 +176,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles mesos client mode") {
- val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+ val clArgs = Seq("--deploy-mode", "client",
"--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
- "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
- "--driver-memory", "4g")
+ "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -187,22 +190,24 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
test("launch simple application with spark-submit") {
runSparkSubmit(
- Seq("unUsed.jar",
+ Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
- "--master", "local"))
+ "--master", "local",
+ "unUsed.jar"))
}
test("spark submit includes jars passed in through --jar") {
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
- runSparkSubmit(
- Seq("unUsed.jar",
- "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
- "--name", "testApp",
- "--master", "local-cluster[2,1,512]",
- "--jars", jarsString))
+ val args = Seq(
+ "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local-cluster[2,1,512]",
+ "--jars", jarsString,
+ "unused.jar")
+ runSparkSubmit(args)
}
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index dcc0630426..b011679fed 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -73,30 +73,34 @@ the bin directory. This script takes care of setting up the classpath with Spark
dependencies, and can support different cluster managers and deploy modes that Spark supports.
It's usage is
- ./bin/spark-submit <app jar> --class path.to.your.Class [other options..]
+ ./bin/spark-submit --class path.to.your.Class [options] <app jar> [app options]
-To enumerate all options available to `spark-submit` run it with the `--help` flag.
-Here are a few examples of common options:
+When calling `spark-submit`, `[app options]` will be passed along to your application's
+main class. To enumerate all options available to `spark-submit` run it with
+the `--help` flag. Here are a few examples of common options:
{% highlight bash %}
# Run application locally
-./bin/spark-submit my-app.jar \
+./bin/spark-submit \
--class my.main.ClassName
- --master local[8]
+ --master local[8] \
+ my-app.jar
# Run on a Spark cluster
-./bin/spark-submit my-app.jar \
+./bin/spark-submit \
--class my.main.ClassName
--master spark://mycluster:7077 \
--executor-memory 20G \
- --total-executor-cores 100
+ --total-executor-cores 100 \
+ my-app.jar
# Run on a YARN cluster
-HADOOP_CONF_DIR=XX /bin/spark-submit my-app.jar \
+HADOOP_CONF_DIR=XX /bin/spark-submit \
--class my.main.ClassName
--master yarn-cluster \ # can also be `yarn-client` for client mode
--executor-memory 20G \
- --num-executors 50
+ --num-executors 50 \
+ my-app.jar
{% endhighlight %}
### Loading Configurations from a File
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 68afa6e1bf..64996b52e0 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -179,9 +179,10 @@ $ sbt package
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
-$ YOUR_SPARK_HOME/bin/spark-submit target/scala-2.10/simple-project_2.10-1.0.jar \
+$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
- --master local[4]
+ --master local[4] \
+ target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23
{% endhighlight %}
@@ -272,9 +273,10 @@ $ mvn package
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
-$ YOUR_SPARK_HOME/bin/spark-submit target/simple-project-1.0.jar \
+$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
- --master local[4]
+ --master local[4] \
+ target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
{% endhighlight %}