aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-05-10 20:58:02 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-10 20:58:02 -0700
commit83e0424d87022e7a967088365931a08aa06ffd9f (patch)
tree0bca98b588dfe7d553bf6c4afc366fb245aad56a /core
parent2b7bd29eb6ee5baf739eec143044ecfc296b9b1f (diff)
downloadspark-83e0424d87022e7a967088365931a08aa06ffd9f.tar.gz
spark-83e0424d87022e7a967088365931a08aa06ffd9f.tar.bz2
spark-83e0424d87022e7a967088365931a08aa06ffd9f.zip
[SPARK-1774] Respect SparkSubmit --jars on YARN (client)
SparkSubmit ignores `--jars` for YARN client. This is a bug. This PR also automatically adds the application jar to `spark.jar`. Previously, when running as yarn-client, you must specify the jar additionally through `--files` (because `--jars` didn't work). Now you don't have to explicitly specify it through either. Tested on a YARN cluster. Author: Andrew Or <andrewor14@gmail.com> Closes #710 from andrewor14/yarn-jars and squashes the following commits: 35d1928 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-jars c27bf6c [Andrew Or] For yarn-cluster and python, do not add primaryResource to spark.jar c92c5bf [Andrew Or] Minor cleanups 269f9f3 [Andrew Or] Fix format 013d840 [Andrew Or] Fix tests 1407474 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-jars 3bb75e8 [Andrew Or] Allow SparkSubmit --jars to take effect in yarn-client mode
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala39
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala110
3 files changed, 101 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c639b3e15d..71bab29544 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
- // --addjars option to the client to upload the file into the distributed cache
+ // --addJars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 16de6f7cdb..c6d3cbd2e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -67,8 +67,7 @@ object SparkSubmit {
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
/**
- * @return
- * a tuple containing the arguments for the child, a list of classpath
+ * @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system propertes, a list of env vars
* and the main class for the child
*/
@@ -115,13 +114,16 @@ object SparkSubmit {
val sysProps = new HashMap[String, String]()
var childMainClass = ""
+ val isPython = args.isPython
+ val isYarnCluster = clusterManager == YARN && deployOnCluster
+
if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}
// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
- if (args.isPython) {
+ if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
@@ -161,6 +163,7 @@ object SparkSubmit {
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
+ OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
@@ -168,7 +171,8 @@ object SparkSubmit {
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
- OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
+ OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
+ OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
@@ -176,20 +180,18 @@ object SparkSubmit {
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
- OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
- OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
- OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
- OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
- OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
+ OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)
// For client mode make any added jars immediately visible on the classpath
@@ -212,9 +214,10 @@ object SparkSubmit {
}
}
- // For standalone mode, add the application jar automatically so the user doesn't have to
- // call sc.addJar. TODO: Standalone mode in the cluster
- if (clusterManager == STANDALONE) {
+ // Add the application jar automatically so the user doesn't have to call sc.addJar
+ // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
+ // For python files, the primary resource is already distributed as a regular file
+ if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
jars = jars ++ Seq(args.primaryResource)
@@ -222,11 +225,11 @@ object SparkSubmit {
sysProps.put("spark.jars", jars.mkString(","))
}
+ // Standalone cluster specific configurations
if (deployOnCluster && clusterManager == STANDALONE) {
if (args.supervise) {
childArgs += "--supervise"
}
-
childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
@@ -243,6 +246,7 @@ object SparkSubmit {
}
}
+ // Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}
@@ -250,9 +254,12 @@ object SparkSubmit {
(childArgs, childClasspath, sysProps, childMainClass)
}
- private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
- sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
- {
+ private def launch(
+ childArgs: ArrayBuffer[String],
+ childClasspath: ArrayBuffer[String],
+ sysProps: Map[String, String],
+ childMainClass: String,
+ verbose: Boolean = false) {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index c9edb03cde..6c0deede53 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -87,25 +87,41 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles arguments with --key=val") {
- val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp")
+ val clArgs = Seq(
+ "--jars=one.jar,two.jar,three.jar",
+ "--name=myApp")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.jars should be ("one.jar,two.jar,three.jar")
appArgs.name should be ("myApp")
}
test("handles arguments to user program") {
- val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
+ val clArgs = Seq(
+ "--name", "myApp",
+ "--class", "Foo",
+ "userjar.jar",
+ "some",
+ "--weird", "args")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.childArgs should be (Seq("some", "--weird", "args"))
}
test("handles YARN cluster mode") {
- val clArgs = Seq("--deploy-mode", "cluster",
- "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
- "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
- "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
- "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
- "thejar.jar", "arg1", "arg2")
+ val clArgs = Seq(
+ "--deploy-mode", "cluster",
+ "--master", "yarn",
+ "--executor-memory", "5g",
+ "--executor-cores", "5",
+ "--class", "org.SomeClass",
+ "--jars", "one.jar,two.jar,three.jar",
+ "--driver-memory", "4g",
+ "--queue", "thequeue",
+ "--files", "file1.txt,file2.txt",
+ "--archives", "archive1.txt,archive2.txt",
+ "--num-executors", "6",
+ "--name", "beauty",
+ "thejar.jar",
+ "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
@@ -127,12 +143,21 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles YARN client mode") {
- val clArgs = Seq("--deploy-mode", "client",
- "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
- "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
- "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
- "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
- "thejar.jar", "arg1", "arg2")
+ val clArgs = Seq(
+ "--deploy-mode", "client",
+ "--master", "yarn",
+ "--executor-memory", "5g",
+ "--executor-cores", "5",
+ "--class", "org.SomeClass",
+ "--jars", "one.jar,two.jar,three.jar",
+ "--driver-memory", "4g",
+ "--queue", "thequeue",
+ "--files", "file1.txt,file2.txt",
+ "--archives", "archive1.txt,archive2.txt",
+ "--num-executors", "6",
+ "--name", "trill",
+ "thejar.jar",
+ "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -142,6 +167,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
classpath should contain ("two.jar")
classpath should contain ("three.jar")
sysProps("spark.app.name") should be ("trill")
+ sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.executor.cores") should be ("5")
sysProps("spark.yarn.queue") should be ("thequeue")
@@ -152,9 +178,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles standalone cluster mode") {
- val clArgs = Seq("--deploy-mode", "cluster",
- "--master", "spark://h:p", "--class", "org.SomeClass",
- "--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2")
+ val clArgs = Seq(
+ "--deploy-mode", "cluster",
+ "--master", "spark://h:p",
+ "--class", "org.SomeClass",
+ "--supervise",
+ "--driver-memory", "4g",
+ "--driver-cores", "5",
+ "thejar.jar",
+ "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
@@ -166,9 +198,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles standalone client mode") {
- val clArgs = Seq("--deploy-mode", "client",
- "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
- "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
+ val clArgs = Seq(
+ "--deploy-mode", "client",
+ "--master", "spark://h:p",
+ "--executor-memory", "5g",
+ "--total-executor-cores", "5",
+ "--class", "org.SomeClass",
+ "--driver-memory", "4g",
+ "thejar.jar",
+ "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -179,9 +217,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("handles mesos client mode") {
- val clArgs = Seq("--deploy-mode", "client",
- "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
- "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
+ val clArgs = Seq(
+ "--deploy-mode", "client",
+ "--master", "mesos://h:p",
+ "--executor-memory", "5g",
+ "--total-executor-cores", "5",
+ "--class", "org.SomeClass",
+ "--driver-memory", "4g",
+ "thejar.jar",
+ "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -192,15 +236,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
test("launch simple application with spark-submit") {
- runSparkSubmit(
- Seq(
- "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
- "--name", "testApp",
- "--master", "local",
- "unUsed.jar"))
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local",
+ unusedJar.toString)
+ runSparkSubmit(args)
}
test("spark submit includes jars passed in through --jar") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
@@ -209,7 +255,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--jars", jarsString,
- "unused.jar")
+ unusedJar.toString)
runSparkSubmit(args)
}
@@ -227,7 +273,7 @@ object JarCreationTest {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
- val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x =>
+ val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
var foundClasses = false
try {
Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
@@ -248,7 +294,6 @@ object SimpleApplicationTest {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
-
val configs = Seq("spark.master", "spark.app.name")
for (config <- configs) {
val masterValue = conf.get(config)
@@ -266,6 +311,5 @@ object SimpleApplicationTest {
s"Master had $config=$masterValue but executor had $config=$executorValue")
}
}
-
}
}