aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-03-11 01:03:01 -0700
committerPatrick Wendell <patrick@databricks.com>2015-03-11 01:03:01 -0700
commit517975d89d40a77c7186f488547eed11f79c1e97 (patch)
tree51bbc6c180bc28ae45a61511d44f5367f357ffd0 /launcher
parent2d4e00efe2cf179935ae108a68f28edf6e5a1628 (diff)
downloadspark-517975d89d40a77c7186f488547eed11f79c1e97.tar.gz
spark-517975d89d40a77c7186f488547eed11f79c1e97.tar.bz2
spark-517975d89d40a77c7186f488547eed11f79c1e97.zip
[SPARK-4924] Add a library for launching Spark jobs programmatically.
This change encapsulates all the logic involved in launching a Spark job into a small Java library that can be easily embedded into other applications. The overall goal of this change is twofold, as described in the bug: - Provide a public API for launching Spark processes. This is a common request from users and currently there's no good answer for it. - Remove a lot of the duplicated code and other coupling that exists in the different parts of Spark that deal with launching processes. A lot of the duplication was due to different code needed to build an application's classpath (and the bootstrapper needed to run the driver in certain situations), and also different code needed to parse spark-submit command line options in different contexts. The change centralizes those as much as possible so that all code paths can rely on the library for handling those appropriately. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3916 from vanzin/SPARK-4924 and squashes the following commits: 18c7e4d [Marcelo Vanzin] Fix make-distribution.sh. 2ce741f [Marcelo Vanzin] Add lots of quotes. 3b28a75 [Marcelo Vanzin] Update new pom. a1b8af1 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 897141f [Marcelo Vanzin] Review feedback. e2367d2 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 28cd35e [Marcelo Vanzin] Remove stale comment. b1d86b0 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 00505f9 [Marcelo Vanzin] Add blurb about new API in the programming guide. 5f4ddcc [Marcelo Vanzin] Better usage messages. 92a9cfb [Marcelo Vanzin] Fix Win32 launcher, usage. 6184c07 [Marcelo Vanzin] Rename field. 4c19196 [Marcelo Vanzin] Update comment. 7e66c18 [Marcelo Vanzin] Fix pyspark tests. 0031a8e [Marcelo Vanzin] Review feedback. c12d84b [Marcelo Vanzin] Review feedback. And fix spark-submit on Windows. e2d4d71 [Marcelo Vanzin] Simplify some code used to launch pyspark. 43008a7 [Marcelo Vanzin] Don't make builder extend SparkLauncher. b4d6912 [Marcelo Vanzin] Use spark-submit script in SparkLauncher. 28b1434 [Marcelo Vanzin] Add a comment. 304333a [Marcelo Vanzin] Fix propagation of properties file arg. bb67b93 [Marcelo Vanzin] Remove unrelated Yarn change (that is also wrong). 8ec0243 [Marcelo Vanzin] Add missing newline. 95ddfa8 [Marcelo Vanzin] Fix handling of --help for spark-class command builder. 72da7ec [Marcelo Vanzin] Rename SparkClassLauncher. 62978e4 [Marcelo Vanzin] Minor cleanup of Windows code path. 9cd5b44 [Marcelo Vanzin] Make all non-public APIs package-private. e4c80b6 [Marcelo Vanzin] Reorganize the code so that only SparkLauncher is public. e50dc5e [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 de81da2 [Marcelo Vanzin] Fix CommandUtils. 86a87bf [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 2061967 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 46d46da [Marcelo Vanzin] Clean up a test and make it more future-proof. b93692a [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 ad03c48 [Marcelo Vanzin] Revert "Fix a thread-safety issue in "local" mode." 0b509d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 23aa2a9 [Marcelo Vanzin] Read java-opts from conf dir, not spark home. 7cff919 [Marcelo Vanzin] Javadoc updates. eae4d8e [Marcelo Vanzin] Fix new unit tests on Windows. e570fb5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 44cd5f7 [Marcelo Vanzin] Add package-info.java, clean up javadocs. f7cacff [Marcelo Vanzin] Remove "launch Spark in new thread" feature. 7ed8859 [Marcelo Vanzin] Some more feedback. 54cd4fd [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 61919df [Marcelo Vanzin] Clean leftover debug statement. aae5897 [Marcelo Vanzin] Use launcher classes instead of jars in non-release mode. e584fc3 [Marcelo Vanzin] Rework command building a little bit. 525ef5b [Marcelo Vanzin] Rework Unix spark-class to handle argument with newlines. 8ac4e92 [Marcelo Vanzin] Minor test cleanup. e946a99 [Marcelo Vanzin] Merge PySparkLauncher into SparkSubmitCliLauncher. c617539 [Marcelo Vanzin] Review feedback round 1. fc6a3e2 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 f26556b [Marcelo Vanzin] Fix a thread-safety issue in "local" mode. 2f4e8b4 [Marcelo Vanzin] Changes needed to make this work with SPARK-4048. 799fc20 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 bb5d324 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 53faef1 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 a7936ef [Marcelo Vanzin] Fix pyspark tests. 656374e [Marcelo Vanzin] Mima fixes. 4d511e7 [Marcelo Vanzin] Fix tools search code. 7a01e4a [Marcelo Vanzin] Fix pyspark on Yarn. 1b3f6e9 [Marcelo Vanzin] Call SparkSubmit from spark-class launcher for unknown classes. 25c5ae6 [Marcelo Vanzin] Centralize SparkSubmit command line parsing. 27be98a [Marcelo Vanzin] Modify Spark to use launcher lib. 6f70eea [Marcelo Vanzin] [SPARK-4924] Add a library for launching Spark jobs programatically.
Diffstat (limited to 'launcher')
-rw-r--r--launcher/pom.xml83
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java362
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java296
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/Main.java173
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java108
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java279
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java327
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java224
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/package-info.java45
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java101
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java94
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java278
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java108
-rw-r--r--launcher/src/test/resources/log4j.properties31
14 files changed, 2509 insertions, 0 deletions
diff --git a/launcher/pom.xml b/launcher/pom.xml
new file mode 100644
index 0000000000..ccbd9d0419
--- /dev/null
+++ b/launcher/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Launcher Project</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>launcher</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <!-- NOTE: only test-scope dependencies are allowed in this module. -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Not needed by the test code, but referenced by SparkSubmit which is used by the tests. -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
new file mode 100644
index 0000000000..dc90e9e987
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarFile;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Abstract Spark command builder that defines common functionality.
+ */
+abstract class AbstractCommandBuilder {
+
+ boolean verbose;
+ String appName;
+ String appResource;
+ String deployMode;
+ String javaHome;
+ String mainClass;
+ String master;
+ String propertiesFile;
+ final List<String> appArgs;
+ final List<String> jars;
+ final List<String> files;
+ final List<String> pyFiles;
+ final Map<String, String> childEnv;
+ final Map<String, String> conf;
+
+ public AbstractCommandBuilder() {
+ this.appArgs = new ArrayList<String>();
+ this.childEnv = new HashMap<String, String>();
+ this.conf = new HashMap<String, String>();
+ this.files = new ArrayList<String>();
+ this.jars = new ArrayList<String>();
+ this.pyFiles = new ArrayList<String>();
+ }
+
+ /**
+ * Builds the command to execute.
+ *
+ * @param env A map containing environment variables for the child process. It may already contain
+ * entries defined by the user (such as SPARK_HOME, or those defined by the
+ * SparkLauncher constructor that takes an environment), and may be modified to
+ * include other variables needed by the process to be executed.
+ */
+ abstract List<String> buildCommand(Map<String, String> env) throws IOException;
+
+ /**
+ * Builds a list of arguments to run java.
+ *
+ * This method finds the java executable to use and appends JVM-specific options for running a
+ * class with Spark in the classpath. It also loads options from the "java-opts" file in the
+ * configuration directory being used.
+ *
+ * Callers should still add at least the class to run, as well as any arguments to pass to the
+ * class.
+ */
+ List<String> buildJavaCommand(String extraClassPath) throws IOException {
+ List<String> cmd = new ArrayList<String>();
+ if (javaHome == null) {
+ cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
+ } else {
+ cmd.add(join(File.separator, javaHome, "bin", "java"));
+ }
+
+ // Load extra JAVA_OPTS from conf/java-opts, if it exists.
+ File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
+ if (javaOpts.isFile()) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(javaOpts), "UTF-8"));
+ try {
+ String line;
+ while ((line = br.readLine()) != null) {
+ addOptionString(cmd, line);
+ }
+ } finally {
+ br.close();
+ }
+ }
+
+ cmd.add("-cp");
+ cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
+ return cmd;
+ }
+
+ /**
+ * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
+ * set it.
+ */
+ void addPermGenSizeOpt(List<String> cmd) {
+ // Don't set MaxPermSize for Java 8 and later.
+ String[] version = System.getProperty("java.version").split("\\.");
+ if (Integer.parseInt(version[0]) > 1 || Integer.parseInt(version[1]) > 7) {
+ return;
+ }
+
+ for (String arg : cmd) {
+ if (arg.startsWith("-XX:MaxPermSize=")) {
+ return;
+ }
+ }
+
+ cmd.add("-XX:MaxPermSize=128m");
+ }
+
+ void addOptionString(List<String> cmd, String options) {
+ if (!isEmpty(options)) {
+ for (String opt : parseOptionString(options)) {
+ cmd.add(opt);
+ }
+ }
+ }
+
+ /**
+ * Builds the classpath for the application. Returns a list with one classpath entry per element;
+ * each entry is formatted in the way expected by <i>java.net.URLClassLoader</i> (more
+ * specifically, with trailing slashes for directories).
+ */
+ List<String> buildClassPath(String appClassPath) throws IOException {
+ String sparkHome = getSparkHome();
+ String scala = getScalaVersion();
+
+ List<String> cp = new ArrayList<String>();
+ addToClassPath(cp, getenv("SPARK_CLASSPATH"));
+ addToClassPath(cp, appClassPath);
+
+ addToClassPath(cp, getConfDir());
+
+ boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
+ boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
+ if (prependClasses || isTesting) {
+ List<String> projects = Arrays.asList("core", "repl", "mllib", "bagel", "graphx",
+ "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
+ "yarn", "launcher");
+ if (prependClasses) {
+ System.err.println(
+ "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
+ "assembly.");
+ for (String project : projects) {
+ addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
+ scala));
+ }
+ }
+ if (isTesting) {
+ for (String project : projects) {
+ addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome,
+ project, scala));
+ }
+ }
+
+ // Add this path to include jars that are shaded in the final deliverable created during
+ // the maven build. These jars are copied to this directory during the build.
+ addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
+ }
+
+ String assembly = findAssembly(scala);
+ addToClassPath(cp, assembly);
+
+ // When Hive support is needed, Datanucleus jars must be included on the classpath. Datanucleus
+ // jars do not work if only included in the uber jar as plugin.xml metadata is lost. Both sbt
+ // and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is built
+ // with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
+ // assembly is built for Hive, before actually populating the CLASSPATH with the jars.
+ //
+ // This block also serves as a check for SPARK-1703, when the assembly jar is built with
+ // Java 7 and ends up with too many files, causing issues with other JDK versions.
+ boolean needsDataNucleus = false;
+ JarFile assemblyJar = null;
+ try {
+ assemblyJar = new JarFile(assembly);
+ needsDataNucleus = assemblyJar.getEntry("org/apache/hadoop/hive/ql/exec/") != null;
+ } catch (IOException ioe) {
+ if (ioe.getMessage().indexOf("invalid CEN header") >= 0) {
+ System.err.println(
+ "Loading Spark jar failed.\n" +
+ "This is likely because Spark was compiled with Java 7 and run\n" +
+ "with Java 6 (see SPARK-1703). Please use Java 7 to run Spark\n" +
+ "or build Spark with Java 6.");
+ System.exit(1);
+ } else {
+ throw ioe;
+ }
+ } finally {
+ if (assemblyJar != null) {
+ try {
+ assemblyJar.close();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
+ }
+
+ if (needsDataNucleus) {
+ System.err.println("Spark assembly has been built with Hive, including Datanucleus jars " +
+ "in classpath.");
+ File libdir;
+ if (new File(sparkHome, "RELEASE").isFile()) {
+ libdir = new File(sparkHome, "lib");
+ } else {
+ libdir = new File(sparkHome, "lib_managed/jars");
+ }
+
+ checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+ libdir.getAbsolutePath());
+ for (File jar : libdir.listFiles()) {
+ if (jar.getName().startsWith("datanucleus-")) {
+ addToClassPath(cp, jar.getAbsolutePath());
+ }
+ }
+ }
+
+ addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
+ addToClassPath(cp, getenv("YARN_CONF_DIR"));
+ addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
+ return cp;
+ }
+
+ /**
+ * Adds entries to the classpath.
+ *
+ * @param cp List to which the new entries are appended.
+ * @param entries New classpath entries (separated by File.pathSeparator).
+ */
+ private void addToClassPath(List<String> cp, String entries) {
+ if (isEmpty(entries)) {
+ return;
+ }
+ String[] split = entries.split(Pattern.quote(File.pathSeparator));
+ for (String entry : split) {
+ if (!isEmpty(entry)) {
+ if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) {
+ entry += File.separator;
+ }
+ cp.add(entry);
+ }
+ }
+ }
+
+ String getScalaVersion() {
+ String scala = getenv("SPARK_SCALA_VERSION");
+ if (scala != null) {
+ return scala;
+ }
+
+ String sparkHome = getSparkHome();
+ File scala210 = new File(sparkHome, "assembly/target/scala-2.10");
+ File scala211 = new File(sparkHome, "assembly/target/scala-2.11");
+ checkState(!scala210.isDirectory() || !scala211.isDirectory(),
+ "Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
+ "Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
+ if (scala210.isDirectory()) {
+ return "2.10";
+ } else {
+ checkState(scala211.isDirectory(), "Cannot find any assembly build directories.");
+ return "2.11";
+ }
+ }
+
+ String getSparkHome() {
+ String path = getenv(ENV_SPARK_HOME);
+ checkState(path != null,
+ "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+ return path;
+ }
+
+ /**
+ * Loads the configuration file for the application, if it exists. This is either the
+ * user-specified properties file, or the spark-defaults.conf file under the Spark configuration
+ * directory.
+ */
+ Properties loadPropertiesFile() throws IOException {
+ Properties props = new Properties();
+ File propsFile;
+ if (propertiesFile != null) {
+ propsFile = new File(propertiesFile);
+ checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
+ } else {
+ propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
+ }
+
+ if (propsFile.isFile()) {
+ FileInputStream fd = null;
+ try {
+ fd = new FileInputStream(propsFile);
+ props.load(new InputStreamReader(fd, "UTF-8"));
+ } finally {
+ if (fd != null) {
+ try {
+ fd.close();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
+ }
+ }
+
+ return props;
+ }
+
+ String getenv(String key) {
+ return firstNonEmpty(childEnv.get(key), System.getenv(key));
+ }
+
+ private String findAssembly(String scalaVersion) {
+ String sparkHome = getSparkHome();
+ File libdir;
+ if (new File(sparkHome, "RELEASE").isFile()) {
+ libdir = new File(sparkHome, "lib");
+ checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+ libdir.getAbsolutePath());
+ } else {
+ libdir = new File(sparkHome, String.format("assembly/target/scala-%s", scalaVersion));
+ }
+
+ final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
+ FileFilter filter = new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isFile() && re.matcher(file.getName()).matches();
+ }
+ };
+ File[] assemblies = libdir.listFiles(filter);
+ checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir);
+ checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir);
+ return assemblies[0].getAbsolutePath();
+ }
+
+ private String getConfDir() {
+ String confDir = getenv("SPARK_CONF_DIR");
+ return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
new file mode 100644
index 0000000000..9b04732afe
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods for command builders.
+ */
+class CommandBuilderUtils {
+
+ static final String DEFAULT_MEM = "512m";
+ static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
+ static final String ENV_SPARK_HOME = "SPARK_HOME";
+
+ /** Returns whether the given string is null or empty. */
+ static boolean isEmpty(String s) {
+ return s == null || s.isEmpty();
+ }
+
+ /** Joins a list of strings using the given separator. */
+ static String join(String sep, String... elements) {
+ StringBuilder sb = new StringBuilder();
+ for (String e : elements) {
+ if (e != null) {
+ if (sb.length() > 0) {
+ sb.append(sep);
+ }
+ sb.append(e);
+ }
+ }
+ return sb.toString();
+ }
+
+ /** Joins a list of strings using the given separator. */
+ static String join(String sep, Iterable<String> elements) {
+ StringBuilder sb = new StringBuilder();
+ for (String e : elements) {
+ if (e != null) {
+ if (sb.length() > 0) {
+ sb.append(sep);
+ }
+ sb.append(e);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Returns the first non-empty value mapped to the given key in the given maps, or null otherwise.
+ */
+ static String firstNonEmptyValue(String key, Map<?, ?>... maps) {
+ for (Map<?, ?> map : maps) {
+ String value = (String) map.get(key);
+ if (!isEmpty(value)) {
+ return value;
+ }
+ }
+ return null;
+ }
+
+ /** Returns the first non-empty, non-null string in the given list, or null otherwise. */
+ static String firstNonEmpty(String... candidates) {
+ for (String s : candidates) {
+ if (!isEmpty(s)) {
+ return s;
+ }
+ }
+ return null;
+ }
+
+ /** Returns the name of the env variable that holds the native library path. */
+ static String getLibPathEnvName() {
+ if (isWindows()) {
+ return "PATH";
+ }
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Mac OS X")) {
+ return "DYLD_LIBRARY_PATH";
+ } else {
+ return "LD_LIBRARY_PATH";
+ }
+ }
+
+ /** Returns whether the OS is Windows. */
+ static boolean isWindows() {
+ String os = System.getProperty("os.name");
+ return os.startsWith("Windows");
+ }
+
+ /**
+ * Updates the user environment, appending the given pathList to the existing value of the given
+ * environment variable (or setting it if it hasn't yet been set).
+ */
+ static void mergeEnvPathList(Map<String, String> userEnv, String envKey, String pathList) {
+ if (!isEmpty(pathList)) {
+ String current = firstNonEmpty(userEnv.get(envKey), System.getenv(envKey));
+ userEnv.put(envKey, join(File.pathSeparator, current, pathList));
+ }
+ }
+
+ /**
+ * Parse a string as if it were a list of arguments, following bash semantics.
+ * For example:
+ *
+ * Input: "\"ab cd\" efgh 'i \" j'"
+ * Output: [ "ab cd", "efgh", "i \" j" ]
+ */
+ static List<String> parseOptionString(String s) {
+ List<String> opts = new ArrayList<String>();
+ StringBuilder opt = new StringBuilder();
+ boolean inOpt = false;
+ boolean inSingleQuote = false;
+ boolean inDoubleQuote = false;
+ boolean escapeNext = false;
+
+ // This is needed to detect when a quoted empty string is used as an argument ("" or '').
+ boolean hasData = false;
+
+ for (int i = 0; i < s.length(); i++) {
+ int c = s.codePointAt(i);
+ if (escapeNext) {
+ opt.appendCodePoint(c);
+ escapeNext = false;
+ } else if (inOpt) {
+ switch (c) {
+ case '\\':
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ escapeNext = true;
+ }
+ break;
+ case '\'':
+ if (inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inSingleQuote = !inSingleQuote;
+ }
+ break;
+ case '"':
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inDoubleQuote = !inDoubleQuote;
+ }
+ break;
+ default:
+ if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ opts.add(opt.toString());
+ opt.setLength(0);
+ inOpt = false;
+ hasData = false;
+ }
+ }
+ } else {
+ switch (c) {
+ case '\'':
+ inSingleQuote = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ case '"':
+ inDoubleQuote = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ case '\\':
+ escapeNext = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ default:
+ if (!Character.isWhitespace(c)) {
+ inOpt = true;
+ hasData = true;
+ opt.appendCodePoint(c);
+ }
+ }
+ }
+ }
+
+ checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s);
+ if (hasData) {
+ opts.add(opt.toString());
+ }
+ return opts;
+ }
+
+ /** Throws IllegalArgumentException if the given object is null. */
+ static void checkNotNull(Object o, String arg) {
+ if (o == null) {
+ throw new IllegalArgumentException(String.format("'%s' must not be null.", arg));
+ }
+ }
+
+ /** Throws IllegalArgumentException with the given message if the check is false. */
+ static void checkArgument(boolean check, String msg, Object... args) {
+ if (!check) {
+ throw new IllegalArgumentException(String.format(msg, args));
+ }
+ }
+
+ /** Throws IllegalStateException with the given message if the check is false. */
+ static void checkState(boolean check, String msg, Object... args) {
+ if (!check) {
+ throw new IllegalStateException(String.format(msg, args));
+ }
+ }
+
+ /**
+ * Quote a command argument for a command to be run by a Windows batch script, if the argument
+ * needs quoting. Arguments only seem to need quotes in batch scripts if they have certain
+ * special characters, some of which need extra (and different) escaping.
+ *
+ * For example:
+ * original single argument: ab="cde fgh"
+ * quoted: "ab^=""cde fgh"""
+ */
+ static String quoteForBatchScript(String arg) {
+
+ boolean needsQuotes = false;
+ for (int i = 0; i < arg.length(); i++) {
+ int c = arg.codePointAt(i);
+ if (Character.isWhitespace(c) || c == '"' || c == '=') {
+ needsQuotes = true;
+ break;
+ }
+ }
+ if (!needsQuotes) {
+ return arg;
+ }
+ StringBuilder quoted = new StringBuilder();
+ quoted.append("\"");
+ for (int i = 0; i < arg.length(); i++) {
+ int cp = arg.codePointAt(i);
+ switch (cp) {
+ case '"':
+ quoted.append('"');
+ break;
+
+ case '=':
+ quoted.append('^');
+ break;
+
+ default:
+ break;
+ }
+ quoted.appendCodePoint(cp);
+ }
+ quoted.append("\"");
+ return quoted.toString();
+ }
+
+ /**
+ * Quotes a string so that it can be used in a command string and be parsed back into a single
+ * argument by python's "shlex.split()" function.
+ *
+ * Basically, just add simple escapes. E.g.:
+ * original single argument : ab "cd" ef
+ * after: "ab \"cd\" ef"
+ */
+ static String quoteForPython(String s) {
+ StringBuilder quoted = new StringBuilder().append('"');
+ for (int i = 0; i < s.length(); i++) {
+ int cp = s.codePointAt(i);
+ if (cp == '"' || cp == '\\') {
+ quoted.appendCodePoint('\\');
+ }
+ quoted.appendCodePoint(cp);
+ }
+ return quoted.append('"').toString();
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java
new file mode 100644
index 0000000000..206acfb514
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command line interface for the Spark launcher. Used internally by Spark scripts.
+ */
+class Main {
+
+ /**
+ * Usage: Main [class] [class args]
+ * <p/>
+ * This CLI works in two different modes:
+ * <ul>
+ * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
+ * {@link SparkLauncher} class is used to launch a Spark application.</li>
+ * <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
+ * </ul>
+ *
+ * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
+ * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
+ * <p/>
+ * On Unix-like systems, the output is a list of command arguments, separated by the NULL
+ * character. On Windows, the output is a command line suitable for direct execution from the
+ * script.
+ */
+ public static void main(String[] argsArray) throws Exception {
+ checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
+
+ List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
+ String className = args.remove(0);
+
+ boolean printLaunchCommand;
+ boolean printUsage;
+ AbstractCommandBuilder builder;
+ try {
+ if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
+ builder = new SparkSubmitCommandBuilder(args);
+ } else {
+ builder = new SparkClassCommandBuilder(className, args);
+ }
+ printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
+ printUsage = false;
+ } catch (IllegalArgumentException e) {
+ builder = new UsageCommandBuilder(e.getMessage());
+ printLaunchCommand = false;
+ printUsage = true;
+ }
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = builder.buildCommand(env);
+ if (printLaunchCommand) {
+ System.err.println("Spark Command: " + join(" ", cmd));
+ System.err.println("========================================");
+ }
+
+ if (isWindows()) {
+ // When printing the usage message, we can't use "cmd /v" since that prevents the env
+ // variable from being seen in the caller script. So do not call prepareWindowsCommand().
+ if (printUsage) {
+ System.out.println(join(" ", cmd));
+ } else {
+ System.out.println(prepareWindowsCommand(cmd, env));
+ }
+ } else {
+ // In bash, use NULL as the arg separator since it cannot be used in an argument.
+ List<String> bashCmd = prepareBashCommand(cmd, env);
+ for (String c : bashCmd) {
+ System.out.print(c);
+ System.out.print('\0');
+ }
+ }
+ }
+
+ /**
+ * Prepare a command line for execution from a Windows batch script.
+ *
+ * The method quotes all arguments so that spaces are handled as expected. Quotes within arguments
+ * are "double quoted" (which is batch for escaping a quote). This page has more details about
+ * quoting and other batch script fun stuff: http://ss64.com/nt/syntax-esc.html
+ *
+ * The command is executed using "cmd /c" and formatted in single line, since that's the
+ * easiest way to consume this from a batch script (see spark-class2.cmd).
+ */
+ private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
+ StringBuilder cmdline = new StringBuilder("cmd /c \"");
+ for (Map.Entry<String, String> e : childEnv.entrySet()) {
+ cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
+ cmdline.append(" && ");
+ }
+ for (String arg : cmd) {
+ cmdline.append(quoteForBatchScript(arg));
+ cmdline.append(" ");
+ }
+ cmdline.append("\"");
+ return cmdline.toString();
+ }
+
+ /**
+ * Prepare the command for execution from a bash script. The final command will have commands to
+ * set up any needed environment variables needed by the child process.
+ */
+ private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
+ if (childEnv.isEmpty()) {
+ return cmd;
+ }
+
+ List<String> newCmd = new ArrayList<String>();
+ newCmd.add("env");
+
+ for (Map.Entry<String, String> e : childEnv.entrySet()) {
+ newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+ newCmd.addAll(cmd);
+ return newCmd;
+ }
+
+ /**
+ * Internal builder used when command line parsing fails. This will behave differently depending
+ * on the platform:
+ *
+ * - On Unix-like systems, it will print a call to the "usage" function with two arguments: the
+ * the error string, and the exit code to use. The function is expected to print the command's
+ * usage and exit with the provided exit code. The script should use "export -f usage" after
+ * declaring a function called "usage", so that the function is available to downstream scripts.
+ *
+ * - On Windows it will set the variable "SPARK_LAUNCHER_USAGE_ERROR" to the usage error message.
+ * The batch script should check for this variable and print its usage, since batch scripts
+ * don't really support the "export -f" functionality used in bash.
+ */
+ private static class UsageCommandBuilder extends AbstractCommandBuilder {
+
+ private final String message;
+
+ UsageCommandBuilder(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) {
+ if (isWindows()) {
+ return Arrays.asList("set", "SPARK_LAUNCHER_USAGE_ERROR=" + message);
+ } else {
+ return Arrays.asList("usage", message, "1");
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
new file mode 100644
index 0000000000..e601a0a19f
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command builder for internal Spark classes.
+ * <p/>
+ * This class handles building the command to launch all internal Spark classes except for
+ * SparkSubmit (which is handled by {@link SparkSubmitCommandBuilder} class.
+ */
+class SparkClassCommandBuilder extends AbstractCommandBuilder {
+
+ private final String className;
+ private final List<String> classArgs;
+
+ SparkClassCommandBuilder(String className, List<String> classArgs) {
+ this.className = className;
+ this.classArgs = classArgs;
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) throws IOException {
+ List<String> javaOptsKeys = new ArrayList<String>();
+ String memKey = null;
+ String extraClassPath = null;
+
+ // Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) +
+ // SPARK_DAEMON_MEMORY.
+ if (className.equals("org.apache.spark.deploy.master.Master")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_MASTER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_WORKER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_HISTORY_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.startsWith("org.apache.spark.tools.")) {
+ String sparkHome = getSparkHome();
+ File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
+ "scala-" + getScalaVersion()));
+ checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
+
+ Pattern re = Pattern.compile("spark-tools_.*\\.jar");
+ for (File f : toolsDir.listFiles()) {
+ if (re.matcher(f.getName()).matches()) {
+ extraClassPath = f.getAbsolutePath();
+ break;
+ }
+ }
+
+ checkState(extraClassPath != null,
+ "Failed to find Spark Tools Jar in %s.\n" +
+ "You need to run \"build/sbt tools/package\" before running %s.",
+ toolsDir.getAbsolutePath(), className);
+
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ }
+
+ List<String> cmd = buildJavaCommand(extraClassPath);
+ for (String key : javaOptsKeys) {
+ addOptionString(cmd, System.getenv(key));
+ }
+
+ String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
+ cmd.add("-Xms" + mem);
+ cmd.add("-Xmx" + mem);
+ addPermGenSizeOpt(cmd);
+ cmd.add(className);
+ cmd.addAll(classArgs);
+ return cmd;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
new file mode 100644
index 0000000000..b566507ee6
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Launcher for Spark applications.
+ * <p/>
+ * Use this class to start Spark applications programmatically. The class uses a builder pattern
+ * to allow clients to configure the Spark application and launch it as a child process.
+ */
+public class SparkLauncher {
+
+ /** The Spark master. */
+ public static final String SPARK_MASTER = "spark.master";
+
+ /** Configuration key for the driver memory. */
+ public static final String DRIVER_MEMORY = "spark.driver.memory";
+ /** Configuration key for the driver class path. */
+ public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
+ /** Configuration key for the driver VM options. */
+ public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
+ /** Configuration key for the driver native library path. */
+ public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
+
+ /** Configuration key for the executor memory. */
+ public static final String EXECUTOR_MEMORY = "spark.executor.memory";
+ /** Configuration key for the executor class path. */
+ public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
+ /** Configuration key for the executor VM options. */
+ public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
+ /** Configuration key for the executor native library path. */
+ public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryOptions";
+ /** Configuration key for the number of executor CPU cores. */
+ public static final String EXECUTOR_CORES = "spark.executor.cores";
+
+ private final SparkSubmitCommandBuilder builder;
+
+ public SparkLauncher() {
+ this(null);
+ }
+
+ /**
+ * Creates a launcher that will set the given environment variables in the child.
+ *
+ * @param env Environment variables to set.
+ */
+ public SparkLauncher(Map<String, String> env) {
+ this.builder = new SparkSubmitCommandBuilder();
+ if (env != null) {
+ this.builder.childEnv.putAll(env);
+ }
+ }
+
+ /**
+ * Set a custom JAVA_HOME for launching the Spark application.
+ *
+ * @param javaHome Path to the JAVA_HOME to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setJavaHome(String javaHome) {
+ checkNotNull(javaHome, "javaHome");
+ builder.javaHome = javaHome;
+ return this;
+ }
+
+ /**
+ * Set a custom Spark installation location for the application.
+ *
+ * @param sparkHome Path to the Spark installation to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setSparkHome(String sparkHome) {
+ checkNotNull(sparkHome, "sparkHome");
+ builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
+ return this;
+ }
+
+ /**
+ * Set a custom properties file with Spark configuration for the application.
+ *
+ * @param path Path to custom properties file to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setPropertiesFile(String path) {
+ checkNotNull(path, "path");
+ builder.propertiesFile = path;
+ return this;
+ }
+
+ /**
+ * Set a single configuration value for the application.
+ *
+ * @param key Configuration key.
+ * @param value The value to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setConf(String key, String value) {
+ checkNotNull(key, "key");
+ checkNotNull(value, "value");
+ checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
+ builder.conf.put(key, value);
+ return this;
+ }
+
+ /**
+ * Set the application name.
+ *
+ * @param appName Application name.
+ * @return This launcher.
+ */
+ public SparkLauncher setAppName(String appName) {
+ checkNotNull(appName, "appName");
+ builder.appName = appName;
+ return this;
+ }
+
+ /**
+ * Set the Spark master for the application.
+ *
+ * @param master Spark master.
+ * @return This launcher.
+ */
+ public SparkLauncher setMaster(String master) {
+ checkNotNull(master, "master");
+ builder.master = master;
+ return this;
+ }
+
+ /**
+ * Set the deploy mode for the application.
+ *
+ * @param mode Deploy mode.
+ * @return This launcher.
+ */
+ public SparkLauncher setDeployMode(String mode) {
+ checkNotNull(mode, "mode");
+ builder.deployMode = mode;
+ return this;
+ }
+
+ /**
+ * Set the main application resource. This should be the location of a jar file for Scala/Java
+ * applications, or a python script for PySpark applications.
+ *
+ * @param resource Path to the main application resource.
+ * @return This launcher.
+ */
+ public SparkLauncher setAppResource(String resource) {
+ checkNotNull(resource, "resource");
+ builder.appResource = resource;
+ return this;
+ }
+
+ /**
+ * Sets the application class name for Java/Scala applications.
+ *
+ * @param mainClass Application's main class.
+ * @return This launcher.
+ */
+ public SparkLauncher setMainClass(String mainClass) {
+ checkNotNull(mainClass, "mainClass");
+ builder.mainClass = mainClass;
+ return this;
+ }
+
+ /**
+ * Adds command line arguments for the application.
+ *
+ * @param args Arguments to pass to the application's main class.
+ * @return This launcher.
+ */
+ public SparkLauncher addAppArgs(String... args) {
+ for (String arg : args) {
+ checkNotNull(arg, "arg");
+ builder.appArgs.add(arg);
+ }
+ return this;
+ }
+
+ /**
+ * Adds a jar file to be submitted with the application.
+ *
+ * @param jar Path to the jar file.
+ * @return This launcher.
+ */
+ public SparkLauncher addJar(String jar) {
+ checkNotNull(jar, "jar");
+ builder.jars.add(jar);
+ return this;
+ }
+
+ /**
+ * Adds a file to be submitted with the application.
+ *
+ * @param file Path to the file.
+ * @return This launcher.
+ */
+ public SparkLauncher addFile(String file) {
+ checkNotNull(file, "file");
+ builder.files.add(file);
+ return this;
+ }
+
+ /**
+ * Adds a python file / zip / egg to be submitted with the application.
+ *
+ * @param file Path to the file.
+ * @return This launcher.
+ */
+ public SparkLauncher addPyFile(String file) {
+ checkNotNull(file, "file");
+ builder.pyFiles.add(file);
+ return this;
+ }
+
+ /**
+ * Enables verbose reporting for SparkSubmit.
+ *
+ * @param verbose Whether to enable verbose output.
+ * @return This launcher.
+ */
+ public SparkLauncher setVerbose(boolean verbose) {
+ builder.verbose = verbose;
+ return this;
+ }
+
+ /**
+ * Launches a sub-process that will start the configured Spark application.
+ *
+ * @return A process handle for the Spark app.
+ */
+ public Process launch() throws IOException {
+ List<String> cmd = new ArrayList<String>();
+ String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
+ cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
+ cmd.addAll(builder.buildSparkSubmitArgs());
+
+ // Since the child process is a batch script, let's quote things so that special characters are
+ // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
+ // weird.
+ if (isWindows()) {
+ List<String> winCmd = new ArrayList<String>();
+ for (String arg : cmd) {
+ winCmd.add(quoteForBatchScript(arg));
+ }
+ cmd = winCmd;
+ }
+
+ ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
+ for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
+ pb.environment().put(e.getKey(), e.getValue());
+ }
+ return pb.start();
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
new file mode 100644
index 0000000000..6ffdff63d3
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Special command builder for handling a CLI invocation of SparkSubmit.
+ * <p/>
+ * This builder adds command line parsing compatible with SparkSubmit. It handles setting
+ * driver-side options and special parsing behavior needed for the special-casing certain internal
+ * Spark applications.
+ * <p/>
+ * This class has also some special features to aid launching pyspark.
+ */
+class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
+
+ /**
+ * Name of the app resource used to identify the PySpark shell. The command line parser expects
+ * the resource name to be the very first argument to spark-submit in this case.
+ *
+ * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
+ * (see java_gateway.py), and can cause this code to enter into an infinite loop.
+ */
+ static final String PYSPARK_SHELL = "pyspark-shell-main";
+
+ /**
+ * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
+ */
+ static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
+
+ /**
+ * This map must match the class names for available special classes, since this modifies the way
+ * command line parsing works. This maps the class name to the resource to use when calling
+ * spark-submit.
+ */
+ private static final Map<String, String> specialClasses = new HashMap<String, String>();
+ static {
+ specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
+ specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
+ "spark-internal");
+ specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
+ "spark-internal");
+ }
+
+ private final List<String> sparkArgs;
+
+ /**
+ * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
+ * to parse the command lines for things like bin/spark-shell, which allows users to mix and
+ * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
+ */
+ private boolean allowsMixedArguments;
+
+ SparkSubmitCommandBuilder() {
+ this.sparkArgs = new ArrayList<String>();
+ }
+
+ SparkSubmitCommandBuilder(List<String> args) {
+ this();
+ List<String> submitArgs = args;
+ if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
+ this.allowsMixedArguments = true;
+ appResource = PYSPARK_SHELL_RESOURCE;
+ submitArgs = args.subList(1, args.size());
+ } else {
+ this.allowsMixedArguments = false;
+ }
+
+ new OptionParser().parse(submitArgs);
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) throws IOException {
+ if (PYSPARK_SHELL_RESOURCE.equals(appResource)) {
+ return buildPySparkShellCommand(env);
+ } else {
+ return buildSparkSubmitCommand(env);
+ }
+ }
+
+ List<String> buildSparkSubmitArgs() {
+ List<String> args = new ArrayList<String>();
+ SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+
+ if (verbose) {
+ args.add(parser.VERBOSE);
+ }
+
+ if (master != null) {
+ args.add(parser.MASTER);
+ args.add(master);
+ }
+
+ if (deployMode != null) {
+ args.add(parser.DEPLOY_MODE);
+ args.add(deployMode);
+ }
+
+ if (appName != null) {
+ args.add(parser.NAME);
+ args.add(appName);
+ }
+
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ args.add(parser.CONF);
+ args.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+
+ if (propertiesFile != null) {
+ args.add(parser.PROPERTIES_FILE);
+ args.add(propertiesFile);
+ }
+
+ if (!jars.isEmpty()) {
+ args.add(parser.JARS);
+ args.add(join(",", jars));
+ }
+
+ if (!files.isEmpty()) {
+ args.add(parser.FILES);
+ args.add(join(",", files));
+ }
+
+ if (!pyFiles.isEmpty()) {
+ args.add(parser.PY_FILES);
+ args.add(join(",", pyFiles));
+ }
+
+ if (mainClass != null) {
+ args.add(parser.CLASS);
+ args.add(mainClass);
+ }
+
+ args.addAll(sparkArgs);
+ if (appResource != null) {
+ args.add(appResource);
+ }
+ args.addAll(appArgs);
+
+ return args;
+ }
+
+ private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException {
+ // Load the properties file and check whether spark-submit will be running the app's driver
+ // or just launching a cluster app. When running the driver, the JVM's argument will be
+ // modified to cover the driver's configuration.
+ Properties props = loadPropertiesFile();
+ boolean isClientMode = isClientMode(props);
+ String extraClassPath = isClientMode ?
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
+
+ List<String> cmd = buildJavaCommand(extraClassPath);
+ addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
+ addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
+
+ if (isClientMode) {
+ // Figuring out where the memory value come from is a little tricky due to precedence.
+ // Precedence is observed in the following order:
+ // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
+ // - properties file.
+ // - SPARK_DRIVER_MEMORY env variable
+ // - SPARK_MEM env variable
+ // - default value (512m)
+ String memory = firstNonEmpty(firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
+ System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
+ cmd.add("-Xms" + memory);
+ cmd.add("-Xmx" + memory);
+ addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props));
+ mergeEnvPathList(env, getLibPathEnvName(),
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ }
+
+ addPermGenSizeOpt(cmd);
+ cmd.add("org.apache.spark.deploy.SparkSubmit");
+ cmd.addAll(buildSparkSubmitArgs());
+ return cmd;
+ }
+
+ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
+ // For backwards compatibility, if a script is specified in
+ // the pyspark command line, then run it using spark-submit.
+ if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
+ System.err.println(
+ "WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" +
+ "Use ./bin/spark-submit <python file>");
+ appResource = appArgs.get(0);
+ appArgs.remove(0);
+ return buildCommand(env);
+ }
+
+ // When launching the pyspark shell, the spark-submit arguments should be stored in the
+ // PYSPARK_SUBMIT_ARGS env variable. The executable is the PYSPARK_DRIVER_PYTHON env variable
+ // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS.
+ checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
+
+ Properties props = loadPropertiesFile();
+ mergeEnvPathList(env, getLibPathEnvName(),
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+
+ // Store spark-submit arguments in an environment variable, since there's no way to pass
+ // them to shell.py on the comand line.
+ StringBuilder submitArgs = new StringBuilder();
+ for (String arg : buildSparkSubmitArgs()) {
+ if (submitArgs.length() > 0) {
+ submitArgs.append(" ");
+ }
+ submitArgs.append(quoteForPython(arg));
+ }
+ env.put("PYSPARK_SUBMIT_ARGS", submitArgs.toString());
+
+ List<String> pyargs = new ArrayList<String>();
+ pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
+ String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+ if (!isEmpty(pyOpts)) {
+ pyargs.addAll(parseOptionString(pyOpts));
+ }
+
+ return pyargs;
+ }
+
+ private boolean isClientMode(Properties userProps) {
+ String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER));
+ // Default master is "local[*]", so assume client mode in that case.
+ return userMaster == null ||
+ "client".equals(deployMode) ||
+ (!userMaster.equals("yarn-cluster") && deployMode == null);
+ }
+
+ private class OptionParser extends SparkSubmitOptionParser {
+
+ private final List<String> driverJvmKeys = Arrays.asList(
+ SparkLauncher.DRIVER_EXTRA_CLASSPATH,
+ SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH,
+ SparkLauncher.DRIVER_MEMORY);
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ if (opt.equals(MASTER)) {
+ master = value;
+ } else if (opt.equals(DEPLOY_MODE)) {
+ deployMode = value;
+ } else if (opt.equals(PROPERTIES_FILE)) {
+ propertiesFile = value;
+ } else if (opt.equals(DRIVER_MEMORY)) {
+ conf.put(SparkLauncher.DRIVER_MEMORY, value);
+ } else if (opt.equals(DRIVER_JAVA_OPTIONS)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
+ } else if (opt.equals(DRIVER_LIBRARY_PATH)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
+ } else if (opt.equals(DRIVER_CLASS_PATH)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
+ } else if (opt.equals(CONF)) {
+ String[] setConf = value.split("=", 2);
+ checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
+ if (driverJvmKeys.contains(setConf[0])) {
+ conf.put(setConf[0], setConf[1]);
+ }
+ } else if (opt.equals(CLASS)) {
+ // The special classes require some special command line handling, since they allow
+ // mixing spark-submit arguments with arguments that should be propagated to the shell
+ // itself. Note that for this to work, the "--class" argument must come before any
+ // non-spark-submit arguments.
+ mainClass = value;
+ if (specialClasses.containsKey(value)) {
+ allowsMixedArguments = true;
+ appResource = specialClasses.get(value);
+ }
+ } else {
+ sparkArgs.add(opt);
+ if (value != null) {
+ sparkArgs.add(value);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
+ // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
+ // parameter itself will be interpreted by SparkSubmit as the application resource. The
+ // remaining params will be appended to the list of SparkSubmit arguments.
+ if (allowsMixedArguments) {
+ appArgs.add(opt);
+ return true;
+ } else {
+ sparkArgs.add(opt);
+ return false;
+ }
+ }
+
+ @Override
+ protected void handleExtraArgs(List<String> extra) {
+ for (String arg : extra) {
+ sparkArgs.add(arg);
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
new file mode 100644
index 0000000000..8526d2e7cf
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parser for spark-submit command line options.
+ * <p/>
+ * This class encapsulates the parsing code for spark-submit command line options, so that there
+ * is a single list of options that needs to be maintained (well, sort of, but it makes it harder
+ * to break things).
+ */
+class SparkSubmitOptionParser {
+
+ // The following constants define the "main" name for the available options. They're defined
+ // to avoid copy & paste of the raw strings where they're needed.
+ //
+ // The fields are not static so that they're exposed to Scala code that uses this class. See
+ // SparkSubmitArguments.scala. That is also why this class is not abstract - to allow code to
+ // easily use these constants without having to create dummy implementations of this class.
+ protected final String CLASS = "--class";
+ protected final String CONF = "--conf";
+ protected final String DEPLOY_MODE = "--deploy-mode";
+ protected final String DRIVER_CLASS_PATH = "--driver-class-path";
+ protected final String DRIVER_CORES = "--driver-cores";
+ protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";
+ protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
+ protected final String DRIVER_MEMORY = "--driver-memory";
+ protected final String EXECUTOR_MEMORY = "--executor-memory";
+ protected final String FILES = "--files";
+ protected final String JARS = "--jars";
+ protected final String KILL_SUBMISSION = "--kill";
+ protected final String MASTER = "--master";
+ protected final String NAME = "--name";
+ protected final String PACKAGES = "--packages";
+ protected final String PROPERTIES_FILE = "--properties-file";
+ protected final String PROXY_USER = "--proxy-user";
+ protected final String PY_FILES = "--py-files";
+ protected final String REPOSITORIES = "--repositories";
+ protected final String STATUS = "--status";
+ protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
+
+ // Options that do not take arguments.
+ protected final String HELP = "--help";
+ protected final String SUPERVISE = "--supervise";
+ protected final String VERBOSE = "--verbose";
+ protected final String VERSION = "--version";
+
+ // Standalone-only options.
+
+ // YARN-only options.
+ protected final String ARCHIVES = "--archives";
+ protected final String EXECUTOR_CORES = "--executor-cores";
+ protected final String QUEUE = "--queue";
+ protected final String NUM_EXECUTORS = "--num-executors";
+
+ /**
+ * This is the canonical list of spark-submit options. Each entry in the array contains the
+ * different aliases for the same option; the first element of each entry is the "official"
+ * name of the option, passed to {@link #handle(String, String)}.
+ * <p/>
+ * Options not listed here nor in the "switch" list below will result in a call to
+ * {@link $#handleUnknown(String)}.
+ * <p/>
+ * These two arrays are visible for tests.
+ */
+ final String[][] opts = {
+ { ARCHIVES },
+ { CLASS },
+ { CONF, "-c" },
+ { DEPLOY_MODE },
+ { DRIVER_CLASS_PATH },
+ { DRIVER_CORES },
+ { DRIVER_JAVA_OPTIONS },
+ { DRIVER_LIBRARY_PATH },
+ { DRIVER_MEMORY },
+ { EXECUTOR_CORES },
+ { EXECUTOR_MEMORY },
+ { FILES },
+ { JARS },
+ { KILL_SUBMISSION },
+ { MASTER },
+ { NAME },
+ { NUM_EXECUTORS },
+ { PACKAGES },
+ { PROPERTIES_FILE },
+ { PROXY_USER },
+ { PY_FILES },
+ { QUEUE },
+ { REPOSITORIES },
+ { STATUS },
+ { TOTAL_EXECUTOR_CORES },
+ };
+
+ /**
+ * List of switches (command line options that do not take parameters) recognized by spark-submit.
+ */
+ final String[][] switches = {
+ { HELP, "-h" },
+ { SUPERVISE },
+ { VERBOSE, "-v" },
+ { VERSION },
+ };
+
+ /**
+ * Parse a list of spark-submit command line options.
+ * <p/>
+ * See SparkSubmitArguments.scala for a more formal description of available options.
+ *
+ * @throws IllegalArgumentException If an error is found during parsing.
+ */
+ protected final void parse(List<String> args) {
+ Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
+
+ int idx = 0;
+ for (idx = 0; idx < args.size(); idx++) {
+ String arg = args.get(idx);
+ String value = null;
+
+ Matcher m = eqSeparatedOpt.matcher(arg);
+ if (m.matches()) {
+ arg = m.group(1);
+ value = m.group(2);
+ }
+
+ // Look for options with a value.
+ String name = findCliOption(arg, opts);
+ if (name != null) {
+ if (value == null) {
+ if (idx == args.size() - 1) {
+ throw new IllegalArgumentException(
+ String.format("Missing argument for option '%s'.", arg));
+ }
+ idx++;
+ value = args.get(idx);
+ }
+ if (!handle(name, value)) {
+ break;
+ }
+ continue;
+ }
+
+ // Look for a switch.
+ name = findCliOption(arg, switches);
+ if (name != null) {
+ if (!handle(name, null)) {
+ break;
+ }
+ continue;
+ }
+
+ if (!handleUnknown(arg)) {
+ break;
+ }
+ }
+
+ if (idx < args.size()) {
+ idx++;
+ }
+ handleExtraArgs(args.subList(idx, args.size()));
+ }
+
+ /**
+ * Callback for when an option with an argument is parsed.
+ *
+ * @param opt The long name of the cli option (might differ from actual command line).
+ * @param value The value. This will be <i>null</i> if the option does not take a value.
+ * @return Whether to continue parsing the argument list.
+ */
+ protected boolean handle(String opt, String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Callback for when an unrecognized option is parsed.
+ *
+ * @param opt Unrecognized option from the command line.
+ * @return Whether to continue parsing the argument list.
+ */
+ protected boolean handleUnknown(String opt) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Callback for remaining command line arguments after either {@link #handle(String, String)} or
+ * {@link #handleUnknown(String)} return "false". This will be called at the end of parsing even
+ * when there are no remaining arguments.
+ *
+ * @param extra List of remaining arguments.
+ */
+ protected void handleExtraArgs(List<String> extra) {
+ throw new UnsupportedOperationException();
+ }
+
+ private String findCliOption(String name, String[][] available) {
+ for (String[] candidates : available) {
+ for (String candidate : candidates) {
+ if (candidate.equals(name)) {
+ return candidates[0];
+ }
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
new file mode 100644
index 0000000000..7ed756f4b8
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Library for launching Spark applications.
+ * <p/>
+ * This library allows applications to launch Spark programmatically. There's only one entry
+ * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
+ * <p/>
+ * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher}
+ * and configure the application to run. For example:
+ *
+ * <pre>
+ * {@code
+ * import org.apache.spark.launcher.SparkLauncher;
+ *
+ * public class MyLauncher {
+ * public static void main(String[] args) throws Exception {
+ * Process spark = new SparkLauncher()
+ * .setAppResource("/my/app.jar")
+ * .setMainClass("my.spark.app.Main")
+ * .setMaster("local")
+ * .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ * .launch();
+ * spark.waitFor();
+ * }
+ * }
+ * }
+ * </pre>
+ */
+package org.apache.spark.launcher;
diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
new file mode 100644
index 0000000000..dba0203867
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+public class CommandBuilderUtilsSuite {
+
+ @Test
+ public void testValidOptionStrings() {
+ testOpt("a b c d e", Arrays.asList("a", "b", "c", "d", "e"));
+ testOpt("a 'b c' \"d\" e", Arrays.asList("a", "b c", "d", "e"));
+ testOpt("a 'b\\\"c' \"'d'\" e", Arrays.asList("a", "b\\\"c", "'d'", "e"));
+ testOpt("a 'b\"c' \"\\\"d\\\"\" e", Arrays.asList("a", "b\"c", "\"d\"", "e"));
+ testOpt(" a b c \\\\ ", Arrays.asList("a", "b", "c", "\\"));
+
+ // Following tests ported from UtilsSuite.scala.
+ testOpt("", new ArrayList<String>());
+ testOpt("a", Arrays.asList("a"));
+ testOpt("aaa", Arrays.asList("aaa"));
+ testOpt("a b c", Arrays.asList("a", "b", "c"));
+ testOpt(" a b\t c ", Arrays.asList("a", "b", "c"));
+ testOpt("a 'b c'", Arrays.asList("a", "b c"));
+ testOpt("a 'b c' d", Arrays.asList("a", "b c", "d"));
+ testOpt("'b c'", Arrays.asList("b c"));
+ testOpt("a \"b c\"", Arrays.asList("a", "b c"));
+ testOpt("a \"b c\" d", Arrays.asList("a", "b c", "d"));
+ testOpt("\"b c\"", Arrays.asList("b c"));
+ testOpt("a 'b\" c' \"d' e\"", Arrays.asList("a", "b\" c", "d' e"));
+ testOpt("a\t'b\nc'\nd", Arrays.asList("a", "b\nc", "d"));
+ testOpt("a \"b\\\\c\"", Arrays.asList("a", "b\\c"));
+ testOpt("a \"b\\\"c\"", Arrays.asList("a", "b\"c"));
+ testOpt("a 'b\\\"c'", Arrays.asList("a", "b\\\"c"));
+ testOpt("'a'b", Arrays.asList("ab"));
+ testOpt("'a''b'", Arrays.asList("ab"));
+ testOpt("\"a\"b", Arrays.asList("ab"));
+ testOpt("\"a\"\"b\"", Arrays.asList("ab"));
+ testOpt("''", Arrays.asList(""));
+ testOpt("\"\"", Arrays.asList(""));
+ }
+
+ @Test
+ public void testInvalidOptionStrings() {
+ testInvalidOpt("\\");
+ testInvalidOpt("\"abcde");
+ testInvalidOpt("'abcde");
+ }
+
+ @Test
+ public void testWindowsBatchQuoting() {
+ assertEquals("abc", quoteForBatchScript("abc"));
+ assertEquals("\"a b c\"", quoteForBatchScript("a b c"));
+ assertEquals("\"a \"\"b\"\" c\"", quoteForBatchScript("a \"b\" c"));
+ assertEquals("\"a\"\"b\"\"c\"", quoteForBatchScript("a\"b\"c"));
+ assertEquals("\"ab^=\"\"cd\"\"\"", quoteForBatchScript("ab=\"cd\""));
+ }
+
+ @Test
+ public void testPythonArgQuoting() {
+ assertEquals("\"abc\"", quoteForPython("abc"));
+ assertEquals("\"a b c\"", quoteForPython("a b c"));
+ assertEquals("\"a \\\"b\\\" c\"", quoteForPython("a \"b\" c"));
+ }
+
+ private void testOpt(String opts, List<String> expected) {
+ assertEquals(String.format("test string failed to parse: [[ %s ]]", opts),
+ expected, parseOptionString(opts));
+ }
+
+ private void testInvalidOpt(String opts) {
+ try {
+ parseOptionString(opts);
+ fail("Expected exception for invalid option string.");
+ } catch (IllegalArgumentException e) {
+ // pass.
+ }
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
new file mode 100644
index 0000000000..252d5abae1
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * These tests require the Spark assembly to be built before they can be run.
+ */
+public class SparkLauncherSuite {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+
+ @Test
+ public void testChildProcLauncher() throws Exception {
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
+
+ SparkLauncher launcher = new SparkLauncher(env)
+ .setSparkHome(System.getProperty("spark.test.home"))
+ .setMaster("local")
+ .setAppResource("spark-internal")
+ .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .setMainClass(SparkLauncherTestApp.class.getName())
+ .addAppArgs("proc");
+ final Process app = launcher.launch();
+ new Redirector("stdout", app.getInputStream()).start();
+ new Redirector("stderr", app.getErrorStream()).start();
+ assertEquals(0, app.waitFor());
+ }
+
+ public static class SparkLauncherTestApp {
+
+ public static void main(String[] args) throws Exception {
+ assertEquals(1, args.length);
+ assertEquals("proc", args[0]);
+ assertEquals("bar", System.getProperty("foo"));
+ assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER));
+ }
+
+ }
+
+ private static class Redirector extends Thread {
+
+ private final InputStream in;
+
+ Redirector(String name, InputStream in) {
+ this.in = in;
+ setName(name);
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ LOG.warn(line);
+ }
+ } catch (Exception e) {
+ LOG.error("Error reading process output.", e);
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
new file mode 100644
index 0000000000..815edc4e49
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class SparkSubmitCommandBuilderSuite {
+
+ private static File dummyPropsFile;
+ private static SparkSubmitOptionParser parser;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ dummyPropsFile = File.createTempFile("spark", "properties");
+ parser = new SparkSubmitOptionParser();
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ dummyPropsFile.delete();
+ }
+
+ @Test
+ public void testDriverCmdBuilder() throws Exception {
+ testCmdBuilder(true);
+ }
+
+ @Test
+ public void testClusterCmdBuilder() throws Exception {
+ testCmdBuilder(false);
+ }
+
+ @Test
+ public void testCliParser() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.MASTER,
+ "local",
+ parser.DRIVER_MEMORY,
+ "42g",
+ parser.DRIVER_CLASS_PATH,
+ "/driverCp",
+ parser.DRIVER_JAVA_OPTIONS,
+ "extraJavaOpt",
+ parser.CONF,
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath");
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+ assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
+ File.pathSeparator, "/driverLibPath"));
+ assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp"));
+ assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms42g"));
+ assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
+ }
+
+ @Test
+ public void testShellCliParser() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.CLASS,
+ "org.apache.spark.repl.Main",
+ parser.MASTER,
+ "foo",
+ "--app-arg",
+ "bar",
+ "--app-switch",
+ parser.FILES,
+ "baz",
+ parser.NAME,
+ "appName");
+
+ List<String> args = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+ List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
+ assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
+ }
+
+ @Test
+ public void testAlternateSyntaxParsing() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.CLASS + "=org.my.Class",
+ parser.MASTER + "=foo",
+ parser.DEPLOY_MODE + "=bar");
+
+ List<String> cmd = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+ assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
+ assertEquals("foo", findArgValue(cmd, parser.MASTER));
+ assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
+ }
+
+ @Test
+ public void testPySparkLauncher() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ SparkSubmitCommandBuilder.PYSPARK_SHELL,
+ "--master=foo",
+ "--deploy-mode=bar");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+ assertEquals("python", cmd.get(cmd.size() - 1));
+ assertEquals(
+ String.format("\"%s\" \"foo\" \"%s\" \"bar\" \"%s\"",
+ parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.PYSPARK_SHELL_RESOURCE),
+ env.get("PYSPARK_SUBMIT_ARGS"));
+ }
+
+ @Test
+ public void testPySparkFallback() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ "--master=foo",
+ "--deploy-mode=bar",
+ "script.py",
+ "arg1");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+ assertEquals("foo", findArgValue(cmd, "--master"));
+ assertEquals("bar", findArgValue(cmd, "--deploy-mode"));
+ assertEquals("script.py", cmd.get(cmd.size() - 2));
+ assertEquals("arg1", cmd.get(cmd.size() - 1));
+ }
+
+ private void testCmdBuilder(boolean isDriver) throws Exception {
+ String deployMode = isDriver ? "client" : "cluster";
+
+ SparkSubmitCommandBuilder launcher =
+ new SparkSubmitCommandBuilder(Collections.<String>emptyList());
+ launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
+ System.getProperty("spark.test.home"));
+ launcher.master = "yarn";
+ launcher.deployMode = deployMode;
+ launcher.appResource = "/foo";
+ launcher.appName = "MyApp";
+ launcher.mainClass = "my.Class";
+ launcher.propertiesFile = dummyPropsFile.getAbsolutePath();
+ launcher.appArgs.add("foo");
+ launcher.appArgs.add("bar");
+ launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
+ launcher.conf.put("spark.foo", "foo");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = launcher.buildCommand(env);
+
+ // Checks below are different for driver and non-driver mode.
+
+ if (isDriver) {
+ assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms1g"));
+ assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
+ } else {
+ boolean found = false;
+ for (String arg : cmd) {
+ if (arg.startsWith("-Xms") || arg.startsWith("-Xmx")) {
+ found = true;
+ break;
+ }
+ }
+ assertFalse("Memory arguments should not be set.", found);
+ }
+
+ for (String arg : cmd) {
+ if (arg.startsWith("-XX:MaxPermSize=")) {
+ if (isDriver) {
+ assertEquals("-XX:MaxPermSize=256m", arg);
+ } else {
+ assertEquals("-XX:MaxPermSize=128m", arg);
+ }
+ }
+ }
+
+ String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
+ if (isDriver) {
+ assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
+ } else {
+ assertFalse("Driver classpath should not be in command.", contains("/driver", cp));
+ }
+
+ String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());
+ if (isDriver) {
+ assertNotNull("Native library path should be set.", libPath);
+ assertTrue("Native library path should contain provided entry.",
+ contains("/native", libPath.split(Pattern.quote(File.pathSeparator))));
+ } else {
+ assertNull("Native library should not be set.", libPath);
+ }
+
+ // Checks below are the same for both driver and non-driver mode.
+ assertEquals(dummyPropsFile.getAbsolutePath(), findArgValue(cmd, parser.PROPERTIES_FILE));
+ assertEquals("yarn", findArgValue(cmd, parser.MASTER));
+ assertEquals(deployMode, findArgValue(cmd, parser.DEPLOY_MODE));
+ assertEquals("my.Class", findArgValue(cmd, parser.CLASS));
+ assertEquals("MyApp", findArgValue(cmd, parser.NAME));
+
+ boolean appArgsOk = false;
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals("/foo")) {
+ assertEquals("foo", cmd.get(i + 1));
+ assertEquals("bar", cmd.get(i + 2));
+ assertEquals(cmd.size(), i + 3);
+ appArgsOk = true;
+ break;
+ }
+ }
+ assertTrue("App resource and args should be added to command.", appArgsOk);
+
+ Map<String, String> conf = parseConf(cmd, parser);
+ assertEquals("foo", conf.get("spark.foo"));
+ }
+
+ private boolean contains(String needle, String[] haystack) {
+ for (String entry : haystack) {
+ if (entry.equals(needle)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Map<String, String> parseConf(List<String> cmd, SparkSubmitOptionParser parser) {
+ Map<String, String> conf = new HashMap<String, String>();
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals(parser.CONF)) {
+ String[] val = cmd.get(i + 1).split("=", 2);
+ conf.put(val[0], val[1]);
+ i += 1;
+ }
+ }
+ return conf;
+ }
+
+ private String findArgValue(List<String> cmd, String name) {
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals(name)) {
+ return cmd.get(i + 1);
+ }
+ }
+ fail(String.format("arg '%s' not found", name));
+ return null;
+ }
+
+ private boolean findInStringList(String list, String sep, String needle) {
+ return contains(needle, list.split(sep));
+ }
+
+ private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
+ SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
+ builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
+ return builder.buildCommand(env);
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
new file mode 100644
index 0000000000..f3d2109917
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import static org.apache.spark.launcher.SparkSubmitOptionParser.*;
+
+public class SparkSubmitOptionParserSuite {
+
+ private SparkSubmitOptionParser parser;
+
+ @Before
+ public void setUp() {
+ parser = spy(new DummyParser());
+ }
+
+ @Test
+ public void testAllOptions() {
+ int count = 0;
+ for (String[] optNames : parser.opts) {
+ for (String optName : optNames) {
+ String value = optName + "-value";
+ parser.parse(Arrays.asList(optName, value));
+ count++;
+ verify(parser).handle(eq(optNames[0]), eq(value));
+ verify(parser, times(count)).handle(anyString(), anyString());
+ verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+ }
+
+ for (String[] switchNames : parser.switches) {
+ int switchCount = 0;
+ for (String name : switchNames) {
+ parser.parse(Arrays.asList(name));
+ count++;
+ switchCount++;
+ verify(parser, times(switchCount)).handle(eq(switchNames[0]), same((String) null));
+ verify(parser, times(count)).handle(anyString(), any(String.class));
+ verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+ }
+ }
+
+ @Test
+ public void testExtraOptions() {
+ List<String> args = Arrays.asList(parser.MASTER, parser.MASTER, "foo", "bar");
+ parser.parse(args);
+ verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
+ verify(parser).handleUnknown(eq("foo"));
+ verify(parser).handleExtraArgs(eq(Arrays.asList("bar")));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testMissingArg() {
+ parser.parse(Arrays.asList(parser.MASTER));
+ }
+
+ @Test
+ public void testEqualSeparatedOption() {
+ List<String> args = Arrays.asList(parser.MASTER + "=" + parser.MASTER);
+ parser.parse(args);
+ verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
+ verify(parser).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+
+ private static class DummyParser extends SparkSubmitOptionParser {
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ return false;
+ }
+
+ @Override
+ protected void handleExtraArgs(List<String> extra) {
+
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..00c20ad69c
--- /dev/null
+++ b/launcher/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+
+# Some tests will set "test.name" to avoid overwriting the main log file.
+log4j.appender.file.file=target/unit-tests${test.name}.log
+
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN