aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-08-11 16:33:08 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-11 16:33:08 -0700
commit5a5bbc29961630d649d4bd4acd5d19eb537b5fd0 (patch)
treed8ff8381cb690dbcc8d4f4b65650635883342879 /launcher
parent736af95bd0c41723d455246b634a0fb68b38a7c7 (diff)
downloadspark-5a5bbc29961630d649d4bd4acd5d19eb537b5fd0.tar.gz
spark-5a5bbc29961630d649d4bd4acd5d19eb537b5fd0.tar.bz2
spark-5a5bbc29961630d649d4bd4acd5d19eb537b5fd0.zip
[SPARK-9074] [LAUNCHER] Allow arbitrary Spark args to be set.
This change allows any Spark argument to be added to the app to be started using SparkLauncher. Known arguments are properly validated, while unknown arguments are allowed so that the library can launch newer Spark versions (in case SPARK_HOME points at one). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7975 from vanzin/SPARK-9074 and squashes the following commits: b5e451a [Marcelo Vanzin] [SPARK-9074] [launcher] Allow arbitrary Spark args to be set.
Diffstat (limited to 'launcher')
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java101
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java2
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java50
3 files changed, 150 insertions, 3 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index c0f89c9230..03c9358bc8 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -20,12 +20,13 @@ package org.apache.spark.launcher;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
-/**
+/**
* Launcher for Spark applications.
* <p>
* Use this class to start Spark applications programmatically. The class uses a builder pattern
@@ -57,7 +58,8 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";
- private final SparkSubmitCommandBuilder builder;
+ // Visible for testing.
+ final SparkSubmitCommandBuilder builder;
public SparkLauncher() {
this(null);
@@ -188,6 +190,73 @@ public class SparkLauncher {
}
/**
+ * Adds a no-value argument to the Spark invocation. If the argument is known, this method
+ * validates whether the argument is indeed a no-value argument, and throws an exception
+ * otherwise.
+ * <p/>
+ * Use this method with caution. It is possible to create an invalid Spark command by passing
+ * unknown arguments to this method, since those are allowed for forward compatibility.
+ *
+ * @param arg Argument to add.
+ * @return This launcher.
+ */
+ public SparkLauncher addSparkArg(String arg) {
+ SparkSubmitOptionParser validator = new ArgumentValidator(false);
+ validator.parse(Arrays.asList(arg));
+ builder.sparkArgs.add(arg);
+ return this;
+ }
+
+ /**
+ * Adds an argument with a value to the Spark invocation. If the argument name corresponds to
+ * a known argument, the code validates that the argument actually expects a value, and throws
+ * an exception otherwise.
+ * <p/>
+ * It is safe to add arguments modified by other methods in this class (such as
+ * {@link #setMaster(String)} - the last invocation will be the one to take effect.
+ * <p/>
+ * Use this method with caution. It is possible to create an invalid Spark command by passing
+ * unknown arguments to this method, since those are allowed for forward compatibility.
+ *
+ * @param name Name of argument to add.
+ * @param value Value of the argument.
+ * @return This launcher.
+ */
+ public SparkLauncher addSparkArg(String name, String value) {
+ SparkSubmitOptionParser validator = new ArgumentValidator(true);
+ if (validator.MASTER.equals(name)) {
+ setMaster(value);
+ } else if (validator.PROPERTIES_FILE.equals(name)) {
+ setPropertiesFile(value);
+ } else if (validator.CONF.equals(name)) {
+ String[] vals = value.split("=", 2);
+ setConf(vals[0], vals[1]);
+ } else if (validator.CLASS.equals(name)) {
+ setMainClass(value);
+ } else if (validator.JARS.equals(name)) {
+ builder.jars.clear();
+ for (String jar : value.split(",")) {
+ addJar(jar);
+ }
+ } else if (validator.FILES.equals(name)) {
+ builder.files.clear();
+ for (String file : value.split(",")) {
+ addFile(file);
+ }
+ } else if (validator.PY_FILES.equals(name)) {
+ builder.pyFiles.clear();
+ for (String file : value.split(",")) {
+ addPyFile(file);
+ }
+ } else {
+ validator.parse(Arrays.asList(name, value));
+ builder.sparkArgs.add(name);
+ builder.sparkArgs.add(value);
+ }
+ return this;
+ }
+
+ /**
* Adds command line arguments for the application.
*
* @param args Arguments to pass to the application's main class.
@@ -277,4 +346,32 @@ public class SparkLauncher {
return pb.start();
}
+ private static class ArgumentValidator extends SparkSubmitOptionParser {
+
+ private final boolean hasValue;
+
+ ArgumentValidator(boolean hasValue) {
+ this.hasValue = hasValue;
+ }
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ if (value == null && hasValue) {
+ throw new IllegalArgumentException(String.format("'%s' does not expect a value.", opt));
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ // Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
+ return true;
+ }
+
+ protected void handleExtraArgs(List<String> extra) {
+ // No op.
+ }
+
+ };
+
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 87c43aa998..4f354cedee 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -76,7 +76,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
"spark-internal");
}
- private final List<String> sparkArgs;
+ final List<String> sparkArgs;
private final boolean printHelp;
/**
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 252d5abae1..d0c26dd056 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.launcher;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -36,7 +37,53 @@ public class SparkLauncherSuite {
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
@Test
+ public void testSparkArgumentHandling() throws Exception {
+ SparkLauncher launcher = new SparkLauncher()
+ .setSparkHome(System.getProperty("spark.test.home"));
+ SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
+
+ launcher.addSparkArg(opts.HELP);
+ try {
+ launcher.addSparkArg(opts.PROXY_USER);
+ fail("Expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // Expected.
+ }
+
+ launcher.addSparkArg(opts.PROXY_USER, "someUser");
+ try {
+ launcher.addSparkArg(opts.HELP, "someValue");
+ fail("Expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // Expected.
+ }
+
+ launcher.addSparkArg("--future-argument");
+ launcher.addSparkArg("--future-argument", "someValue");
+
+ launcher.addSparkArg(opts.MASTER, "myMaster");
+ assertEquals("myMaster", launcher.builder.master);
+
+ launcher.addJar("foo");
+ launcher.addSparkArg(opts.JARS, "bar");
+ assertEquals(Arrays.asList("bar"), launcher.builder.jars);
+
+ launcher.addFile("foo");
+ launcher.addSparkArg(opts.FILES, "bar");
+ assertEquals(Arrays.asList("bar"), launcher.builder.files);
+
+ launcher.addPyFile("foo");
+ launcher.addSparkArg(opts.PY_FILES, "bar");
+ assertEquals(Arrays.asList("bar"), launcher.builder.pyFiles);
+
+ launcher.setConf("spark.foo", "foo");
+ launcher.addSparkArg(opts.CONF, "spark.foo=bar");
+ assertEquals("bar", launcher.builder.conf.get("spark.foo"));
+ }
+
+ @Test
public void testChildProcLauncher() throws Exception {
+ SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
Map<String, String> env = new HashMap<String, String>();
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
@@ -44,9 +91,12 @@ public class SparkLauncherSuite {
.setSparkHome(System.getProperty("spark.test.home"))
.setMaster("local")
.setAppResource("spark-internal")
+ .addSparkArg(opts.CONF,
+ String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-Dfoo=bar -Dtest.name=-testChildProcLauncher")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();