aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-12-15 18:24:23 -0800
committerAndrew Or <andrew@databricks.com>2015-12-15 18:24:23 -0800
commit63ccdef81329e785807f37b4e918a9247fc70e3c (patch)
tree593d610979db1106498066faf821bf6839508060
parent765a488494dac0ed38d2b81742c06467b79d96b2 (diff)
downloadspark-63ccdef81329e785807f37b4e918a9247fc70e3c.tar.gz
spark-63ccdef81329e785807f37b4e918a9247fc70e3c.tar.bz2
spark-63ccdef81329e785807f37b4e918a9247fc70e3c.zip
[SPARK-10123][DEPLOY] Support specifying deploy mode from configuration
Please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #10195 from jerryshao/SPARK-10123.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala41
-rw-r--r--docs/configuration.md15
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java3
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java7
5 files changed, 64 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 18a1c52ae5..915ef81b4e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -176,7 +176,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
packagesExclusions = Option(packagesExclusions)
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
- deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
+ deployMode = Option(deployMode)
+ .orElse(sparkProperties.get("spark.submit.deployMode"))
+ .orElse(env.get("DEPLOY_MODE"))
+ .orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index d494b0caab..2626f5a16d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -136,6 +136,47 @@ class SparkSubmitSuite
appArgs.childArgs should be (Seq("--master", "local", "some", "--weird", "args"))
}
+ test("specify deploy mode through configuration") {
+ val clArgs = Seq(
+ "--master", "yarn",
+ "--conf", "spark.submit.deployMode=client",
+ "--class", "org.SomeClass",
+ "thejar.jar"
+ )
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (_, _, sysProps, _) = prepareSubmitEnvironment(appArgs)
+
+ appArgs.deployMode should be ("client")
+ sysProps("spark.submit.deployMode") should be ("client")
+
+ // Both cmd line and configuration are specified, cmdline option takes the priority
+ val clArgs1 = Seq(
+ "--master", "yarn",
+ "--deploy-mode", "cluster",
+ "--conf", "spark.submit.deployMode=client",
+ "-class", "org.SomeClass",
+ "thejar.jar"
+ )
+ val appArgs1 = new SparkSubmitArguments(clArgs1)
+ val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1)
+
+ appArgs1.deployMode should be ("cluster")
+ sysProps1("spark.submit.deployMode") should be ("cluster")
+
+ // Neither cmdline nor configuration are specified, client mode is the default choice
+ val clArgs2 = Seq(
+ "--master", "yarn",
+ "--class", "org.SomeClass",
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ appArgs2.deployMode should be (null)
+
+ val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2)
+ appArgs2.deployMode should be ("client")
+ sysProps2("spark.submit.deployMode") should be ("client")
+ }
+
test("handles YARN cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
diff --git a/docs/configuration.md b/docs/configuration.md
index 55cf4b2dac..38d3d059f9 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -48,7 +48,7 @@ The following format is accepted:
1y (years)
-Properties that specify a byte size should be configured with a unit of size.
+Properties that specify a byte size should be configured with a unit of size.
The following format is accepted:
1b (bytes)
@@ -192,6 +192,15 @@ of the most common options to set are:
<a href="submitting-applications.html#master-urls"> allowed master URL's</a>.
</td>
</tr>
+<tr>
+ <td><code>spark.submit.deployMode</code></td>
+ <td>(none)</td>
+ <td>
+ The deploy mode of Spark driver program, either "client" or "cluster",
+ Which means to launch driver program locally ("client")
+ or remotely ("cluster") on one of the nodes inside the cluster.
+ </td>
+</tr>
</table>
Apart from these, the following properties are also available, and may be useful in some situations:
@@ -1095,7 +1104,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.rpc.lookupTimeout</code></td>
<td>120s</td>
<td>
- Duration for an RPC remote endpoint lookup operation to wait before timing out.
+ Duration for an RPC remote endpoint lookup operation to wait before timing out.
</td>
</tr>
</table>
@@ -1559,7 +1568,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.streaming.stopGracefullyOnShutdown</code></td>
<td>false</td>
<td>
- If <code>true</code>, Spark shuts down the <code>StreamingContext</code> gracefully on JVM
+ If <code>true</code>, Spark shuts down the <code>StreamingContext</code> gracefully on JVM
shutdown rather than immediately.
</td>
</tr>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index dd1c93af6c..20e6003a00 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -40,6 +40,9 @@ public class SparkLauncher {
/** The Spark master. */
public static final String SPARK_MASTER = "spark.master";
+ /** The Spark deploy mode. */
+ public static final String DEPLOY_MODE = "spark.submit.deployMode";
+
/** Configuration key for the driver memory. */
public static final String DRIVER_MEMORY = "spark.driver.memory";
/** Configuration key for the driver class path. */
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 312df0b269..a95f0f1751 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -294,10 +294,11 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
private boolean isClientMode(Map<String, String> userProps) {
String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
- // Default master is "local[*]", so assume client mode in that case.
+ String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
+ // Default master is "local[*]", so assume client mode in that case
return userMaster == null ||
- "client".equals(deployMode) ||
- (!userMaster.equals("yarn-cluster") && deployMode == null);
+ "client".equals(userDeployMode) ||
+ (!userMaster.equals("yarn-cluster") && userDeployMode == null);
}
/**