aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
Diffstat (limited to 'launcher')
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java101
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java2
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java50
3 files changed, 150 insertions, 3 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index c0f89c9230..03c9358bc8 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -20,12 +20,13 @@ package org.apache.spark.launcher;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
-/**
+/**
* Launcher for Spark applications.
* <p>
* Use this class to start Spark applications programmatically. The class uses a builder pattern
@@ -57,7 +58,8 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";
- private final SparkSubmitCommandBuilder builder;
+ // Visible for testing.
+ final SparkSubmitCommandBuilder builder;
public SparkLauncher() {
this(null);
@@ -188,6 +190,73 @@ public class SparkLauncher {
}
/**
+ * Adds a no-value argument to the Spark invocation. If the argument is known, this method
+ * validates whether the argument is indeed a no-value argument, and throws an exception
+ * otherwise.
+ * <p/>
+ * Use this method with caution. It is possible to create an invalid Spark command by passing
+ * unknown arguments to this method, since those are allowed for forward compatibility.
+ *
+ * @param arg Argument to add.
+ * @return This launcher.
+ */
+ public SparkLauncher addSparkArg(String arg) {
+ SparkSubmitOptionParser validator = new ArgumentValidator(false);
+ validator.parse(Arrays.asList(arg));
+ builder.sparkArgs.add(arg);
+ return this;
+ }
+
+ /**
+ * Adds an argument with a value to the Spark invocation. If the argument name corresponds to
+ * a known argument, the code validates that the argument actually expects a value, and throws
+ * an exception otherwise.
+ * <p/>
+ * It is safe to add arguments modified by other methods in this class (such as
+ * {@link #setMaster(String)} - the last invocation will be the one to take effect.
+ * <p/>
+ * Use this method with caution. It is possible to create an invalid Spark command by passing
+ * unknown arguments to this method, since those are allowed for forward compatibility.
+ *
+ * @param name Name of argument to add.
+ * @param value Value of the argument.
+ * @return This launcher.
+ */
+ public SparkLauncher addSparkArg(String name, String value) {
+ SparkSubmitOptionParser validator = new ArgumentValidator(true);
+ if (validator.MASTER.equals(name)) {
+ setMaster(value);
+ } else if (validator.PROPERTIES_FILE.equals(name)) {
+ setPropertiesFile(value);
+ } else if (validator.CONF.equals(name)) {
+ String[] vals = value.split("=", 2);
+ setConf(vals[0], vals[1]);
+ } else if (validator.CLASS.equals(name)) {
+ setMainClass(value);
+ } else if (validator.JARS.equals(name)) {
+ builder.jars.clear();
+ for (String jar : value.split(",")) {
+ addJar(jar);
+ }
+ } else if (validator.FILES.equals(name)) {
+ builder.files.clear();
+ for (String file : value.split(",")) {
+ addFile(file);
+ }
+ } else if (validator.PY_FILES.equals(name)) {
+ builder.pyFiles.clear();
+ for (String file : value.split(",")) {
+ addPyFile(file);
+ }
+ } else {
+ validator.parse(Arrays.asList(name, value));
+ builder.sparkArgs.add(name);
+ builder.sparkArgs.add(value);
+ }
+ return this;
+ }
+
+ /**
* Adds command line arguments for the application.
*
* @param args Arguments to pass to the application's main class.
@@ -277,4 +346,32 @@ public class SparkLauncher {
return pb.start();
}
+ private static class ArgumentValidator extends SparkSubmitOptionParser {
+
+ private final boolean hasValue;
+
+ ArgumentValidator(boolean hasValue) {
+ this.hasValue = hasValue;
+ }
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ if (value == null && hasValue) {
+ throw new IllegalArgumentException(String.format("'%s' does not expect a value.", opt));
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ // Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
+ return true;
+ }
+
+ protected void handleExtraArgs(List<String> extra) {
+ // No op.
+ }
+
+ };
+
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 87c43aa998..4f354cedee 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -76,7 +76,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
"spark-internal");
}
- private final List<String> sparkArgs;
+ final List<String> sparkArgs;
private final boolean printHelp;
/**
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 252d5abae1..d0c26dd056 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.launcher;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -36,7 +37,53 @@ public class SparkLauncherSuite {
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
@Test
+ public void testSparkArgumentHandling() throws Exception {
+ SparkLauncher launcher = new SparkLauncher()
+ .setSparkHome(System.getProperty("spark.test.home"));
+ SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
+
+ launcher.addSparkArg(opts.HELP);
+ try {
+ launcher.addSparkArg(opts.PROXY_USER);
+ fail("Expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // Expected.
+ }
+
+ launcher.addSparkArg(opts.PROXY_USER, "someUser");
+ try {
+ launcher.addSparkArg(opts.HELP, "someValue");
+ fail("Expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // Expected.
+ }
+
+ launcher.addSparkArg("--future-argument");
+ launcher.addSparkArg("--future-argument", "someValue");
+
+ launcher.addSparkArg(opts.MASTER, "myMaster");
+ assertEquals("myMaster", launcher.builder.master);
+
+ launcher.addJar("foo");
+ launcher.addSparkArg(opts.JARS, "bar");
+ assertEquals(Arrays.asList("bar"), launcher.builder.jars);
+
+ launcher.addFile("foo");
+ launcher.addSparkArg(opts.FILES, "bar");
+ assertEquals(Arrays.asList("bar"), launcher.builder.files);
+
+ launcher.addPyFile("foo");
+ launcher.addSparkArg(opts.PY_FILES, "bar");
+ assertEquals(Arrays.asList("bar"), launcher.builder.pyFiles);
+
+ launcher.setConf("spark.foo", "foo");
+ launcher.addSparkArg(opts.CONF, "spark.foo=bar");
+ assertEquals("bar", launcher.builder.conf.get("spark.foo"));
+ }
+
+ @Test
public void testChildProcLauncher() throws Exception {
+ SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
Map<String, String> env = new HashMap<String, String>();
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
@@ -44,9 +91,12 @@ public class SparkLauncherSuite {
.setSparkHome(System.getProperty("spark.test.home"))
.setMaster("local")
.setAppResource("spark-internal")
+ .addSparkArg(opts.CONF,
+ String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-Dfoo=bar -Dtest.name=-testChildProcLauncher")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();