diff options
Diffstat (limited to 'launcher/src/main')
8 files changed, 1814 insertions, 0 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java new file mode 100644 index 0000000000..dc90e9e987 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.jar.JarFile; +import java.util.regex.Pattern; + +import static org.apache.spark.launcher.CommandBuilderUtils.*; + +/** + * Abstract Spark command builder that defines common functionality. + */ +abstract class AbstractCommandBuilder { + + boolean verbose; + String appName; + String appResource; + String deployMode; + String javaHome; + String mainClass; + String master; + String propertiesFile; + final List<String> appArgs; + final List<String> jars; + final List<String> files; + final List<String> pyFiles; + final Map<String, String> childEnv; + final Map<String, String> conf; + + public AbstractCommandBuilder() { + this.appArgs = new ArrayList<String>(); + this.childEnv = new HashMap<String, String>(); + this.conf = new HashMap<String, String>(); + this.files = new ArrayList<String>(); + this.jars = new ArrayList<String>(); + this.pyFiles = new ArrayList<String>(); + } + + /** + * Builds the command to execute. + * + * @param env A map containing environment variables for the child process. It may already contain + * entries defined by the user (such as SPARK_HOME, or those defined by the + * SparkLauncher constructor that takes an environment), and may be modified to + * include other variables needed by the process to be executed. + */ + abstract List<String> buildCommand(Map<String, String> env) throws IOException; + + /** + * Builds a list of arguments to run java. + * + * This method finds the java executable to use and appends JVM-specific options for running a + * class with Spark in the classpath. It also loads options from the "java-opts" file in the + * configuration directory being used. + * + * Callers should still add at least the class to run, as well as any arguments to pass to the + * class. + */ + List<String> buildJavaCommand(String extraClassPath) throws IOException { + List<String> cmd = new ArrayList<String>(); + if (javaHome == null) { + cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java")); + } else { + cmd.add(join(File.separator, javaHome, "bin", "java")); + } + + // Load extra JAVA_OPTS from conf/java-opts, if it exists. + File javaOpts = new File(join(File.separator, getConfDir(), "java-opts")); + if (javaOpts.isFile()) { + BufferedReader br = new BufferedReader(new InputStreamReader( + new FileInputStream(javaOpts), "UTF-8")); + try { + String line; + while ((line = br.readLine()) != null) { + addOptionString(cmd, line); + } + } finally { + br.close(); + } + } + + cmd.add("-cp"); + cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath))); + return cmd; + } + + /** + * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't + * set it. + */ + void addPermGenSizeOpt(List<String> cmd) { + // Don't set MaxPermSize for Java 8 and later. + String[] version = System.getProperty("java.version").split("\\."); + if (Integer.parseInt(version[0]) > 1 || Integer.parseInt(version[1]) > 7) { + return; + } + + for (String arg : cmd) { + if (arg.startsWith("-XX:MaxPermSize=")) { + return; + } + } + + cmd.add("-XX:MaxPermSize=128m"); + } + + void addOptionString(List<String> cmd, String options) { + if (!isEmpty(options)) { + for (String opt : parseOptionString(options)) { + cmd.add(opt); + } + } + } + + /** + * Builds the classpath for the application. Returns a list with one classpath entry per element; + * each entry is formatted in the way expected by <i>java.net.URLClassLoader</i> (more + * specifically, with trailing slashes for directories). + */ + List<String> buildClassPath(String appClassPath) throws IOException { + String sparkHome = getSparkHome(); + String scala = getScalaVersion(); + + List<String> cp = new ArrayList<String>(); + addToClassPath(cp, getenv("SPARK_CLASSPATH")); + addToClassPath(cp, appClassPath); + + addToClassPath(cp, getConfDir()); + + boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES")); + boolean isTesting = "1".equals(getenv("SPARK_TESTING")); + if (prependClasses || isTesting) { + List<String> projects = Arrays.asList("core", "repl", "mllib", "bagel", "graphx", + "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver", + "yarn", "launcher"); + if (prependClasses) { + System.err.println( + "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " + + "assembly."); + for (String project : projects) { + addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project, + scala)); + } + } + if (isTesting) { + for (String project : projects) { + addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome, + project, scala)); + } + } + + // Add this path to include jars that are shaded in the final deliverable created during + // the maven build. These jars are copied to this directory during the build. + addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome)); + } + + String assembly = findAssembly(scala); + addToClassPath(cp, assembly); + + // When Hive support is needed, Datanucleus jars must be included on the classpath. Datanucleus + // jars do not work if only included in the uber jar as plugin.xml metadata is lost. Both sbt + // and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is built + // with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark + // assembly is built for Hive, before actually populating the CLASSPATH with the jars. + // + // This block also serves as a check for SPARK-1703, when the assembly jar is built with + // Java 7 and ends up with too many files, causing issues with other JDK versions. + boolean needsDataNucleus = false; + JarFile assemblyJar = null; + try { + assemblyJar = new JarFile(assembly); + needsDataNucleus = assemblyJar.getEntry("org/apache/hadoop/hive/ql/exec/") != null; + } catch (IOException ioe) { + if (ioe.getMessage().indexOf("invalid CEN header") >= 0) { + System.err.println( + "Loading Spark jar failed.\n" + + "This is likely because Spark was compiled with Java 7 and run\n" + + "with Java 6 (see SPARK-1703). Please use Java 7 to run Spark\n" + + "or build Spark with Java 6."); + System.exit(1); + } else { + throw ioe; + } + } finally { + if (assemblyJar != null) { + try { + assemblyJar.close(); + } catch (IOException e) { + // Ignore. + } + } + } + + if (needsDataNucleus) { + System.err.println("Spark assembly has been built with Hive, including Datanucleus jars " + + "in classpath."); + File libdir; + if (new File(sparkHome, "RELEASE").isFile()) { + libdir = new File(sparkHome, "lib"); + } else { + libdir = new File(sparkHome, "lib_managed/jars"); + } + + checkState(libdir.isDirectory(), "Library directory '%s' does not exist.", + libdir.getAbsolutePath()); + for (File jar : libdir.listFiles()) { + if (jar.getName().startsWith("datanucleus-")) { + addToClassPath(cp, jar.getAbsolutePath()); + } + } + } + + addToClassPath(cp, getenv("HADOOP_CONF_DIR")); + addToClassPath(cp, getenv("YARN_CONF_DIR")); + addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH")); + return cp; + } + + /** + * Adds entries to the classpath. + * + * @param cp List to which the new entries are appended. + * @param entries New classpath entries (separated by File.pathSeparator). + */ + private void addToClassPath(List<String> cp, String entries) { + if (isEmpty(entries)) { + return; + } + String[] split = entries.split(Pattern.quote(File.pathSeparator)); + for (String entry : split) { + if (!isEmpty(entry)) { + if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) { + entry += File.separator; + } + cp.add(entry); + } + } + } + + String getScalaVersion() { + String scala = getenv("SPARK_SCALA_VERSION"); + if (scala != null) { + return scala; + } + + String sparkHome = getSparkHome(); + File scala210 = new File(sparkHome, "assembly/target/scala-2.10"); + File scala211 = new File(sparkHome, "assembly/target/scala-2.11"); + checkState(!scala210.isDirectory() || !scala211.isDirectory(), + "Presence of build for both scala versions (2.10 and 2.11) detected.\n" + + "Either clean one of them or set SPARK_SCALA_VERSION in your environment."); + if (scala210.isDirectory()) { + return "2.10"; + } else { + checkState(scala211.isDirectory(), "Cannot find any assembly build directories."); + return "2.11"; + } + } + + String getSparkHome() { + String path = getenv(ENV_SPARK_HOME); + checkState(path != null, + "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); + return path; + } + + /** + * Loads the configuration file for the application, if it exists. This is either the + * user-specified properties file, or the spark-defaults.conf file under the Spark configuration + * directory. + */ + Properties loadPropertiesFile() throws IOException { + Properties props = new Properties(); + File propsFile; + if (propertiesFile != null) { + propsFile = new File(propertiesFile); + checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile); + } else { + propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE); + } + + if (propsFile.isFile()) { + FileInputStream fd = null; + try { + fd = new FileInputStream(propsFile); + props.load(new InputStreamReader(fd, "UTF-8")); + } finally { + if (fd != null) { + try { + fd.close(); + } catch (IOException e) { + // Ignore. + } + } + } + } + + return props; + } + + String getenv(String key) { + return firstNonEmpty(childEnv.get(key), System.getenv(key)); + } + + private String findAssembly(String scalaVersion) { + String sparkHome = getSparkHome(); + File libdir; + if (new File(sparkHome, "RELEASE").isFile()) { + libdir = new File(sparkHome, "lib"); + checkState(libdir.isDirectory(), "Library directory '%s' does not exist.", + libdir.getAbsolutePath()); + } else { + libdir = new File(sparkHome, String.format("assembly/target/scala-%s", scalaVersion)); + } + + final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar"); + FileFilter filter = new FileFilter() { + @Override + public boolean accept(File file) { + return file.isFile() && re.matcher(file.getName()).matches(); + } + }; + File[] assemblies = libdir.listFiles(filter); + checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir); + checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir); + return assemblies[0].getAbsolutePath(); + } + + private String getConfDir() { + String confDir = getenv("SPARK_CONF_DIR"); + return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf"); + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java new file mode 100644 index 0000000000..9b04732afe --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Helper methods for command builders. + */ +class CommandBuilderUtils { + + static final String DEFAULT_MEM = "512m"; + static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf"; + static final String ENV_SPARK_HOME = "SPARK_HOME"; + + /** Returns whether the given string is null or empty. */ + static boolean isEmpty(String s) { + return s == null || s.isEmpty(); + } + + /** Joins a list of strings using the given separator. */ + static String join(String sep, String... elements) { + StringBuilder sb = new StringBuilder(); + for (String e : elements) { + if (e != null) { + if (sb.length() > 0) { + sb.append(sep); + } + sb.append(e); + } + } + return sb.toString(); + } + + /** Joins a list of strings using the given separator. */ + static String join(String sep, Iterable<String> elements) { + StringBuilder sb = new StringBuilder(); + for (String e : elements) { + if (e != null) { + if (sb.length() > 0) { + sb.append(sep); + } + sb.append(e); + } + } + return sb.toString(); + } + + /** + * Returns the first non-empty value mapped to the given key in the given maps, or null otherwise. + */ + static String firstNonEmptyValue(String key, Map<?, ?>... maps) { + for (Map<?, ?> map : maps) { + String value = (String) map.get(key); + if (!isEmpty(value)) { + return value; + } + } + return null; + } + + /** Returns the first non-empty, non-null string in the given list, or null otherwise. */ + static String firstNonEmpty(String... candidates) { + for (String s : candidates) { + if (!isEmpty(s)) { + return s; + } + } + return null; + } + + /** Returns the name of the env variable that holds the native library path. */ + static String getLibPathEnvName() { + if (isWindows()) { + return "PATH"; + } + + String os = System.getProperty("os.name"); + if (os.startsWith("Mac OS X")) { + return "DYLD_LIBRARY_PATH"; + } else { + return "LD_LIBRARY_PATH"; + } + } + + /** Returns whether the OS is Windows. */ + static boolean isWindows() { + String os = System.getProperty("os.name"); + return os.startsWith("Windows"); + } + + /** + * Updates the user environment, appending the given pathList to the existing value of the given + * environment variable (or setting it if it hasn't yet been set). + */ + static void mergeEnvPathList(Map<String, String> userEnv, String envKey, String pathList) { + if (!isEmpty(pathList)) { + String current = firstNonEmpty(userEnv.get(envKey), System.getenv(envKey)); + userEnv.put(envKey, join(File.pathSeparator, current, pathList)); + } + } + + /** + * Parse a string as if it were a list of arguments, following bash semantics. + * For example: + * + * Input: "\"ab cd\" efgh 'i \" j'" + * Output: [ "ab cd", "efgh", "i \" j" ] + */ + static List<String> parseOptionString(String s) { + List<String> opts = new ArrayList<String>(); + StringBuilder opt = new StringBuilder(); + boolean inOpt = false; + boolean inSingleQuote = false; + boolean inDoubleQuote = false; + boolean escapeNext = false; + + // This is needed to detect when a quoted empty string is used as an argument ("" or ''). + boolean hasData = false; + + for (int i = 0; i < s.length(); i++) { + int c = s.codePointAt(i); + if (escapeNext) { + opt.appendCodePoint(c); + escapeNext = false; + } else if (inOpt) { + switch (c) { + case '\\': + if (inSingleQuote) { + opt.appendCodePoint(c); + } else { + escapeNext = true; + } + break; + case '\'': + if (inDoubleQuote) { + opt.appendCodePoint(c); + } else { + inSingleQuote = !inSingleQuote; + } + break; + case '"': + if (inSingleQuote) { + opt.appendCodePoint(c); + } else { + inDoubleQuote = !inDoubleQuote; + } + break; + default: + if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) { + opt.appendCodePoint(c); + } else { + opts.add(opt.toString()); + opt.setLength(0); + inOpt = false; + hasData = false; + } + } + } else { + switch (c) { + case '\'': + inSingleQuote = true; + inOpt = true; + hasData = true; + break; + case '"': + inDoubleQuote = true; + inOpt = true; + hasData = true; + break; + case '\\': + escapeNext = true; + inOpt = true; + hasData = true; + break; + default: + if (!Character.isWhitespace(c)) { + inOpt = true; + hasData = true; + opt.appendCodePoint(c); + } + } + } + } + + checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s); + if (hasData) { + opts.add(opt.toString()); + } + return opts; + } + + /** Throws IllegalArgumentException if the given object is null. */ + static void checkNotNull(Object o, String arg) { + if (o == null) { + throw new IllegalArgumentException(String.format("'%s' must not be null.", arg)); + } + } + + /** Throws IllegalArgumentException with the given message if the check is false. */ + static void checkArgument(boolean check, String msg, Object... args) { + if (!check) { + throw new IllegalArgumentException(String.format(msg, args)); + } + } + + /** Throws IllegalStateException with the given message if the check is false. */ + static void checkState(boolean check, String msg, Object... args) { + if (!check) { + throw new IllegalStateException(String.format(msg, args)); + } + } + + /** + * Quote a command argument for a command to be run by a Windows batch script, if the argument + * needs quoting. Arguments only seem to need quotes in batch scripts if they have certain + * special characters, some of which need extra (and different) escaping. + * + * For example: + * original single argument: ab="cde fgh" + * quoted: "ab^=""cde fgh""" + */ + static String quoteForBatchScript(String arg) { + + boolean needsQuotes = false; + for (int i = 0; i < arg.length(); i++) { + int c = arg.codePointAt(i); + if (Character.isWhitespace(c) || c == '"' || c == '=') { + needsQuotes = true; + break; + } + } + if (!needsQuotes) { + return arg; + } + StringBuilder quoted = new StringBuilder(); + quoted.append("\""); + for (int i = 0; i < arg.length(); i++) { + int cp = arg.codePointAt(i); + switch (cp) { + case '"': + quoted.append('"'); + break; + + case '=': + quoted.append('^'); + break; + + default: + break; + } + quoted.appendCodePoint(cp); + } + quoted.append("\""); + return quoted.toString(); + } + + /** + * Quotes a string so that it can be used in a command string and be parsed back into a single + * argument by python's "shlex.split()" function. + * + * Basically, just add simple escapes. E.g.: + * original single argument : ab "cd" ef + * after: "ab \"cd\" ef" + */ + static String quoteForPython(String s) { + StringBuilder quoted = new StringBuilder().append('"'); + for (int i = 0; i < s.length(); i++) { + int cp = s.codePointAt(i); + if (cp == '"' || cp == '\\') { + quoted.appendCodePoint('\\'); + } + quoted.appendCodePoint(cp); + } + return quoted.append('"').toString(); + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java new file mode 100644 index 0000000000..206acfb514 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.spark.launcher.CommandBuilderUtils.*; + +/** + * Command line interface for the Spark launcher. Used internally by Spark scripts. + */ +class Main { + + /** + * Usage: Main [class] [class args] + * <p/> + * This CLI works in two different modes: + * <ul> + * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the + * {@link SparkLauncher} class is used to launch a Spark application.</li> + * <li>"spark-class": if another class is provided, an internal Spark class is run.</li> + * </ul> + * + * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and + * "bin/spark-class2.cmd" batch script on Windows to execute the final command. + * <p/> + * On Unix-like systems, the output is a list of command arguments, separated by the NULL + * character. On Windows, the output is a command line suitable for direct execution from the + * script. + */ + public static void main(String[] argsArray) throws Exception { + checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); + + List<String> args = new ArrayList<String>(Arrays.asList(argsArray)); + String className = args.remove(0); + + boolean printLaunchCommand; + boolean printUsage; + AbstractCommandBuilder builder; + try { + if (className.equals("org.apache.spark.deploy.SparkSubmit")) { + builder = new SparkSubmitCommandBuilder(args); + } else { + builder = new SparkClassCommandBuilder(className, args); + } + printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); + printUsage = false; + } catch (IllegalArgumentException e) { + builder = new UsageCommandBuilder(e.getMessage()); + printLaunchCommand = false; + printUsage = true; + } + + Map<String, String> env = new HashMap<String, String>(); + List<String> cmd = builder.buildCommand(env); + if (printLaunchCommand) { + System.err.println("Spark Command: " + join(" ", cmd)); + System.err.println("========================================"); + } + + if (isWindows()) { + // When printing the usage message, we can't use "cmd /v" since that prevents the env + // variable from being seen in the caller script. So do not call prepareWindowsCommand(). + if (printUsage) { + System.out.println(join(" ", cmd)); + } else { + System.out.println(prepareWindowsCommand(cmd, env)); + } + } else { + // In bash, use NULL as the arg separator since it cannot be used in an argument. + List<String> bashCmd = prepareBashCommand(cmd, env); + for (String c : bashCmd) { + System.out.print(c); + System.out.print('\0'); + } + } + } + + /** + * Prepare a command line for execution from a Windows batch script. + * + * The method quotes all arguments so that spaces are handled as expected. Quotes within arguments + * are "double quoted" (which is batch for escaping a quote). This page has more details about + * quoting and other batch script fun stuff: http://ss64.com/nt/syntax-esc.html + * + * The command is executed using "cmd /c" and formatted in single line, since that's the + * easiest way to consume this from a batch script (see spark-class2.cmd). + */ + private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) { + StringBuilder cmdline = new StringBuilder("cmd /c \""); + for (Map.Entry<String, String> e : childEnv.entrySet()) { + cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue())); + cmdline.append(" && "); + } + for (String arg : cmd) { + cmdline.append(quoteForBatchScript(arg)); + cmdline.append(" "); + } + cmdline.append("\""); + return cmdline.toString(); + } + + /** + * Prepare the command for execution from a bash script. The final command will have commands to + * set up any needed environment variables needed by the child process. + */ + private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) { + if (childEnv.isEmpty()) { + return cmd; + } + + List<String> newCmd = new ArrayList<String>(); + newCmd.add("env"); + + for (Map.Entry<String, String> e : childEnv.entrySet()) { + newCmd.add(String.format("%s=%s", e.getKey(), e.getValue())); + } + newCmd.addAll(cmd); + return newCmd; + } + + /** + * Internal builder used when command line parsing fails. This will behave differently depending + * on the platform: + * + * - On Unix-like systems, it will print a call to the "usage" function with two arguments: the + * the error string, and the exit code to use. The function is expected to print the command's + * usage and exit with the provided exit code. The script should use "export -f usage" after + * declaring a function called "usage", so that the function is available to downstream scripts. + * + * - On Windows it will set the variable "SPARK_LAUNCHER_USAGE_ERROR" to the usage error message. + * The batch script should check for this variable and print its usage, since batch scripts + * don't really support the "export -f" functionality used in bash. + */ + private static class UsageCommandBuilder extends AbstractCommandBuilder { + + private final String message; + + UsageCommandBuilder(String message) { + this.message = message; + } + + @Override + public List<String> buildCommand(Map<String, String> env) { + if (isWindows()) { + return Arrays.asList("set", "SPARK_LAUNCHER_USAGE_ERROR=" + message); + } else { + return Arrays.asList("usage", message, "1"); + } + } + + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java new file mode 100644 index 0000000000..e601a0a19f --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.apache.spark.launcher.CommandBuilderUtils.*; + +/** + * Command builder for internal Spark classes. + * <p/> + * This class handles building the command to launch all internal Spark classes except for + * SparkSubmit (which is handled by {@link SparkSubmitCommandBuilder} class. + */ +class SparkClassCommandBuilder extends AbstractCommandBuilder { + + private final String className; + private final List<String> classArgs; + + SparkClassCommandBuilder(String className, List<String> classArgs) { + this.className = className; + this.classArgs = classArgs; + } + + @Override + public List<String> buildCommand(Map<String, String> env) throws IOException { + List<String> javaOptsKeys = new ArrayList<String>(); + String memKey = null; + String extraClassPath = null; + + // Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + + // SPARK_DAEMON_MEMORY. + if (className.equals("org.apache.spark.deploy.master.Master")) { + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_MASTER_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + } else if (className.equals("org.apache.spark.deploy.worker.Worker")) { + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_WORKER_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) { + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_HISTORY_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) { + javaOptsKeys.add("SPARK_JAVA_OPTS"); + javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); + memKey = "SPARK_EXECUTOR_MEMORY"; + } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) { + javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); + memKey = "SPARK_EXECUTOR_MEMORY"; + } else if (className.startsWith("org.apache.spark.tools.")) { + String sparkHome = getSparkHome(); + File toolsDir = new File(join(File.separator, sparkHome, "tools", "target", + "scala-" + getScalaVersion())); + checkState(toolsDir.isDirectory(), "Cannot find tools build directory."); + + Pattern re = Pattern.compile("spark-tools_.*\\.jar"); + for (File f : toolsDir.listFiles()) { + if (re.matcher(f.getName()).matches()) { + extraClassPath = f.getAbsolutePath(); + break; + } + } + + checkState(extraClassPath != null, + "Failed to find Spark Tools Jar in %s.\n" + + "You need to run \"build/sbt tools/package\" before running %s.", + toolsDir.getAbsolutePath(), className); + + javaOptsKeys.add("SPARK_JAVA_OPTS"); + } + + List<String> cmd = buildJavaCommand(extraClassPath); + for (String key : javaOptsKeys) { + addOptionString(cmd, System.getenv(key)); + } + + String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); + cmd.add("-Xms" + mem); + cmd.add("-Xmx" + mem); + addPermGenSizeOpt(cmd); + cmd.add(className); + cmd.addAll(classArgs); + return cmd; + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java new file mode 100644 index 0000000000..b566507ee6 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.spark.launcher.CommandBuilderUtils.*; + +/** + * Launcher for Spark applications. + * <p/> + * Use this class to start Spark applications programmatically. The class uses a builder pattern + * to allow clients to configure the Spark application and launch it as a child process. + */ +public class SparkLauncher { + + /** The Spark master. */ + public static final String SPARK_MASTER = "spark.master"; + + /** Configuration key for the driver memory. */ + public static final String DRIVER_MEMORY = "spark.driver.memory"; + /** Configuration key for the driver class path. */ + public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; + /** Configuration key for the driver VM options. */ + public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions"; + /** Configuration key for the driver native library path. */ + public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath"; + + /** Configuration key for the executor memory. */ + public static final String EXECUTOR_MEMORY = "spark.executor.memory"; + /** Configuration key for the executor class path. */ + public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; + /** Configuration key for the executor VM options. */ + public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; + /** Configuration key for the executor native library path. */ + public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryOptions"; + /** Configuration key for the number of executor CPU cores. */ + public static final String EXECUTOR_CORES = "spark.executor.cores"; + + private final SparkSubmitCommandBuilder builder; + + public SparkLauncher() { + this(null); + } + + /** + * Creates a launcher that will set the given environment variables in the child. + * + * @param env Environment variables to set. + */ + public SparkLauncher(Map<String, String> env) { + this.builder = new SparkSubmitCommandBuilder(); + if (env != null) { + this.builder.childEnv.putAll(env); + } + } + + /** + * Set a custom JAVA_HOME for launching the Spark application. + * + * @param javaHome Path to the JAVA_HOME to use. + * @return This launcher. + */ + public SparkLauncher setJavaHome(String javaHome) { + checkNotNull(javaHome, "javaHome"); + builder.javaHome = javaHome; + return this; + } + + /** + * Set a custom Spark installation location for the application. + * + * @param sparkHome Path to the Spark installation to use. + * @return This launcher. + */ + public SparkLauncher setSparkHome(String sparkHome) { + checkNotNull(sparkHome, "sparkHome"); + builder.childEnv.put(ENV_SPARK_HOME, sparkHome); + return this; + } + + /** + * Set a custom properties file with Spark configuration for the application. + * + * @param path Path to custom properties file to use. + * @return This launcher. + */ + public SparkLauncher setPropertiesFile(String path) { + checkNotNull(path, "path"); + builder.propertiesFile = path; + return this; + } + + /** + * Set a single configuration value for the application. + * + * @param key Configuration key. + * @param value The value to use. + * @return This launcher. + */ + public SparkLauncher setConf(String key, String value) { + checkNotNull(key, "key"); + checkNotNull(value, "value"); + checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'"); + builder.conf.put(key, value); + return this; + } + + /** + * Set the application name. + * + * @param appName Application name. + * @return This launcher. + */ + public SparkLauncher setAppName(String appName) { + checkNotNull(appName, "appName"); + builder.appName = appName; + return this; + } + + /** + * Set the Spark master for the application. + * + * @param master Spark master. + * @return This launcher. + */ + public SparkLauncher setMaster(String master) { + checkNotNull(master, "master"); + builder.master = master; + return this; + } + + /** + * Set the deploy mode for the application. + * + * @param mode Deploy mode. + * @return This launcher. + */ + public SparkLauncher setDeployMode(String mode) { + checkNotNull(mode, "mode"); + builder.deployMode = mode; + return this; + } + + /** + * Set the main application resource. This should be the location of a jar file for Scala/Java + * applications, or a python script for PySpark applications. + * + * @param resource Path to the main application resource. + * @return This launcher. + */ + public SparkLauncher setAppResource(String resource) { + checkNotNull(resource, "resource"); + builder.appResource = resource; + return this; + } + + /** + * Sets the application class name for Java/Scala applications. + * + * @param mainClass Application's main class. + * @return This launcher. + */ + public SparkLauncher setMainClass(String mainClass) { + checkNotNull(mainClass, "mainClass"); + builder.mainClass = mainClass; + return this; + } + + /** + * Adds command line arguments for the application. + * + * @param args Arguments to pass to the application's main class. + * @return This launcher. + */ + public SparkLauncher addAppArgs(String... args) { + for (String arg : args) { + checkNotNull(arg, "arg"); + builder.appArgs.add(arg); + } + return this; + } + + /** + * Adds a jar file to be submitted with the application. + * + * @param jar Path to the jar file. + * @return This launcher. + */ + public SparkLauncher addJar(String jar) { + checkNotNull(jar, "jar"); + builder.jars.add(jar); + return this; + } + + /** + * Adds a file to be submitted with the application. + * + * @param file Path to the file. + * @return This launcher. + */ + public SparkLauncher addFile(String file) { + checkNotNull(file, "file"); + builder.files.add(file); + return this; + } + + /** + * Adds a python file / zip / egg to be submitted with the application. + * + * @param file Path to the file. + * @return This launcher. + */ + public SparkLauncher addPyFile(String file) { + checkNotNull(file, "file"); + builder.pyFiles.add(file); + return this; + } + + /** + * Enables verbose reporting for SparkSubmit. + * + * @param verbose Whether to enable verbose output. + * @return This launcher. + */ + public SparkLauncher setVerbose(boolean verbose) { + builder.verbose = verbose; + return this; + } + + /** + * Launches a sub-process that will start the configured Spark application. + * + * @return A process handle for the Spark app. + */ + public Process launch() throws IOException { + List<String> cmd = new ArrayList<String>(); + String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; + cmd.add(join(File.separator, builder.getSparkHome(), "bin", script)); + cmd.addAll(builder.buildSparkSubmitArgs()); + + // Since the child process is a batch script, let's quote things so that special characters are + // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are + // weird. + if (isWindows()) { + List<String> winCmd = new ArrayList<String>(); + for (String arg : cmd) { + winCmd.add(quoteForBatchScript(arg)); + } + cmd = winCmd; + } + + ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()])); + for (Map.Entry<String, String> e : builder.childEnv.entrySet()) { + pb.environment().put(e.getKey(), e.getValue()); + } + return pb.start(); + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java new file mode 100644 index 0000000000..6ffdff63d3 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.spark.launcher.CommandBuilderUtils.*; + +/** + * Special command builder for handling a CLI invocation of SparkSubmit. + * <p/> + * This builder adds command line parsing compatible with SparkSubmit. It handles setting + * driver-side options and special parsing behavior needed for the special-casing certain internal + * Spark applications. + * <p/> + * This class has also some special features to aid launching pyspark. + */ +class SparkSubmitCommandBuilder extends AbstractCommandBuilder { + + /** + * Name of the app resource used to identify the PySpark shell. The command line parser expects + * the resource name to be the very first argument to spark-submit in this case. + * + * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit + * (see java_gateway.py), and can cause this code to enter into an infinite loop. + */ + static final String PYSPARK_SHELL = "pyspark-shell-main"; + + /** + * This is the actual resource name that identifies the PySpark shell to SparkSubmit. + */ + static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell"; + + /** + * This map must match the class names for available special classes, since this modifies the way + * command line parsing works. This maps the class name to the resource to use when calling + * spark-submit. + */ + private static final Map<String, String> specialClasses = new HashMap<String, String>(); + static { + specialClasses.put("org.apache.spark.repl.Main", "spark-shell"); + specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver", + "spark-internal"); + specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + "spark-internal"); + } + + private final List<String> sparkArgs; + + /** + * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed + * to parse the command lines for things like bin/spark-shell, which allows users to mix and + * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo"). + */ + private boolean allowsMixedArguments; + + SparkSubmitCommandBuilder() { + this.sparkArgs = new ArrayList<String>(); + } + + SparkSubmitCommandBuilder(List<String> args) { + this(); + List<String> submitArgs = args; + if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) { + this.allowsMixedArguments = true; + appResource = PYSPARK_SHELL_RESOURCE; + submitArgs = args.subList(1, args.size()); + } else { + this.allowsMixedArguments = false; + } + + new OptionParser().parse(submitArgs); + } + + @Override + public List<String> buildCommand(Map<String, String> env) throws IOException { + if (PYSPARK_SHELL_RESOURCE.equals(appResource)) { + return buildPySparkShellCommand(env); + } else { + return buildSparkSubmitCommand(env); + } + } + + List<String> buildSparkSubmitArgs() { + List<String> args = new ArrayList<String>(); + SparkSubmitOptionParser parser = new SparkSubmitOptionParser(); + + if (verbose) { + args.add(parser.VERBOSE); + } + + if (master != null) { + args.add(parser.MASTER); + args.add(master); + } + + if (deployMode != null) { + args.add(parser.DEPLOY_MODE); + args.add(deployMode); + } + + if (appName != null) { + args.add(parser.NAME); + args.add(appName); + } + + for (Map.Entry<String, String> e : conf.entrySet()) { + args.add(parser.CONF); + args.add(String.format("%s=%s", e.getKey(), e.getValue())); + } + + if (propertiesFile != null) { + args.add(parser.PROPERTIES_FILE); + args.add(propertiesFile); + } + + if (!jars.isEmpty()) { + args.add(parser.JARS); + args.add(join(",", jars)); + } + + if (!files.isEmpty()) { + args.add(parser.FILES); + args.add(join(",", files)); + } + + if (!pyFiles.isEmpty()) { + args.add(parser.PY_FILES); + args.add(join(",", pyFiles)); + } + + if (mainClass != null) { + args.add(parser.CLASS); + args.add(mainClass); + } + + args.addAll(sparkArgs); + if (appResource != null) { + args.add(appResource); + } + args.addAll(appArgs); + + return args; + } + + private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException { + // Load the properties file and check whether spark-submit will be running the app's driver + // or just launching a cluster app. When running the driver, the JVM's argument will be + // modified to cover the driver's configuration. + Properties props = loadPropertiesFile(); + boolean isClientMode = isClientMode(props); + String extraClassPath = isClientMode ? + firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null; + + List<String> cmd = buildJavaCommand(extraClassPath); + addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); + addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS")); + + if (isClientMode) { + // Figuring out where the memory value come from is a little tricky due to precedence. + // Precedence is observed in the following order: + // - explicit configuration (setConf()), which also covers --driver-memory cli argument. + // - properties file. + // - SPARK_DRIVER_MEMORY env variable + // - SPARK_MEM env variable + // - default value (512m) + String memory = firstNonEmpty(firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props), + System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); + cmd.add("-Xms" + memory); + cmd.add("-Xmx" + memory); + addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props)); + mergeEnvPathList(env, getLibPathEnvName(), + firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props)); + } + + addPermGenSizeOpt(cmd); + cmd.add("org.apache.spark.deploy.SparkSubmit"); + cmd.addAll(buildSparkSubmitArgs()); + return cmd; + } + + private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException { + // For backwards compatibility, if a script is specified in + // the pyspark command line, then run it using spark-submit. + if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) { + System.err.println( + "WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" + + "Use ./bin/spark-submit <python file>"); + appResource = appArgs.get(0); + appArgs.remove(0); + return buildCommand(env); + } + + // When launching the pyspark shell, the spark-submit arguments should be stored in the + // PYSPARK_SUBMIT_ARGS env variable. The executable is the PYSPARK_DRIVER_PYTHON env variable + // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS. + checkArgument(appArgs.isEmpty(), "pyspark does not support any application options."); + + Properties props = loadPropertiesFile(); + mergeEnvPathList(env, getLibPathEnvName(), + firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props)); + + // Store spark-submit arguments in an environment variable, since there's no way to pass + // them to shell.py on the comand line. + StringBuilder submitArgs = new StringBuilder(); + for (String arg : buildSparkSubmitArgs()) { + if (submitArgs.length() > 0) { + submitArgs.append(" "); + } + submitArgs.append(quoteForPython(arg)); + } + env.put("PYSPARK_SUBMIT_ARGS", submitArgs.toString()); + + List<String> pyargs = new ArrayList<String>(); + pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python")); + String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); + if (!isEmpty(pyOpts)) { + pyargs.addAll(parseOptionString(pyOpts)); + } + + return pyargs; + } + + private boolean isClientMode(Properties userProps) { + String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER)); + // Default master is "local[*]", so assume client mode in that case. + return userMaster == null || + "client".equals(deployMode) || + (!userMaster.equals("yarn-cluster") && deployMode == null); + } + + private class OptionParser extends SparkSubmitOptionParser { + + private final List<String> driverJvmKeys = Arrays.asList( + SparkLauncher.DRIVER_EXTRA_CLASSPATH, + SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, + SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, + SparkLauncher.DRIVER_MEMORY); + + @Override + protected boolean handle(String opt, String value) { + if (opt.equals(MASTER)) { + master = value; + } else if (opt.equals(DEPLOY_MODE)) { + deployMode = value; + } else if (opt.equals(PROPERTIES_FILE)) { + propertiesFile = value; + } else if (opt.equals(DRIVER_MEMORY)) { + conf.put(SparkLauncher.DRIVER_MEMORY, value); + } else if (opt.equals(DRIVER_JAVA_OPTIONS)) { + conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); + } else if (opt.equals(DRIVER_LIBRARY_PATH)) { + conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); + } else if (opt.equals(DRIVER_CLASS_PATH)) { + conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); + } else if (opt.equals(CONF)) { + String[] setConf = value.split("=", 2); + checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); + if (driverJvmKeys.contains(setConf[0])) { + conf.put(setConf[0], setConf[1]); + } + } else if (opt.equals(CLASS)) { + // The special classes require some special command line handling, since they allow + // mixing spark-submit arguments with arguments that should be propagated to the shell + // itself. Note that for this to work, the "--class" argument must come before any + // non-spark-submit arguments. + mainClass = value; + if (specialClasses.containsKey(value)) { + allowsMixedArguments = true; + appResource = specialClasses.get(value); + } + } else { + sparkArgs.add(opt); + if (value != null) { + sparkArgs.add(value); + } + } + return true; + } + + @Override + protected boolean handleUnknown(String opt) { + // When mixing arguments, add unrecognized parameters directly to the user arguments list. In + // normal mode, any unrecognized parameter triggers the end of command line parsing, and the + // parameter itself will be interpreted by SparkSubmit as the application resource. The + // remaining params will be appended to the list of SparkSubmit arguments. + if (allowsMixedArguments) { + appArgs.add(opt); + return true; + } else { + sparkArgs.add(opt); + return false; + } + } + + @Override + protected void handleExtraArgs(List<String> extra) { + for (String arg : extra) { + sparkArgs.add(arg); + } + } + + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java new file mode 100644 index 0000000000..8526d2e7cf --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Parser for spark-submit command line options. + * <p/> + * This class encapsulates the parsing code for spark-submit command line options, so that there + * is a single list of options that needs to be maintained (well, sort of, but it makes it harder + * to break things). + */ +class SparkSubmitOptionParser { + + // The following constants define the "main" name for the available options. They're defined + // to avoid copy & paste of the raw strings where they're needed. + // + // The fields are not static so that they're exposed to Scala code that uses this class. See + // SparkSubmitArguments.scala. That is also why this class is not abstract - to allow code to + // easily use these constants without having to create dummy implementations of this class. + protected final String CLASS = "--class"; + protected final String CONF = "--conf"; + protected final String DEPLOY_MODE = "--deploy-mode"; + protected final String DRIVER_CLASS_PATH = "--driver-class-path"; + protected final String DRIVER_CORES = "--driver-cores"; + protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options"; + protected final String DRIVER_LIBRARY_PATH = "--driver-library-path"; + protected final String DRIVER_MEMORY = "--driver-memory"; + protected final String EXECUTOR_MEMORY = "--executor-memory"; + protected final String FILES = "--files"; + protected final String JARS = "--jars"; + protected final String KILL_SUBMISSION = "--kill"; + protected final String MASTER = "--master"; + protected final String NAME = "--name"; + protected final String PACKAGES = "--packages"; + protected final String PROPERTIES_FILE = "--properties-file"; + protected final String PROXY_USER = "--proxy-user"; + protected final String PY_FILES = "--py-files"; + protected final String REPOSITORIES = "--repositories"; + protected final String STATUS = "--status"; + protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; + + // Options that do not take arguments. + protected final String HELP = "--help"; + protected final String SUPERVISE = "--supervise"; + protected final String VERBOSE = "--verbose"; + protected final String VERSION = "--version"; + + // Standalone-only options. + + // YARN-only options. + protected final String ARCHIVES = "--archives"; + protected final String EXECUTOR_CORES = "--executor-cores"; + protected final String QUEUE = "--queue"; + protected final String NUM_EXECUTORS = "--num-executors"; + + /** + * This is the canonical list of spark-submit options. Each entry in the array contains the + * different aliases for the same option; the first element of each entry is the "official" + * name of the option, passed to {@link #handle(String, String)}. + * <p/> + * Options not listed here nor in the "switch" list below will result in a call to + * {@link $#handleUnknown(String)}. + * <p/> + * These two arrays are visible for tests. + */ + final String[][] opts = { + { ARCHIVES }, + { CLASS }, + { CONF, "-c" }, + { DEPLOY_MODE }, + { DRIVER_CLASS_PATH }, + { DRIVER_CORES }, + { DRIVER_JAVA_OPTIONS }, + { DRIVER_LIBRARY_PATH }, + { DRIVER_MEMORY }, + { EXECUTOR_CORES }, + { EXECUTOR_MEMORY }, + { FILES }, + { JARS }, + { KILL_SUBMISSION }, + { MASTER }, + { NAME }, + { NUM_EXECUTORS }, + { PACKAGES }, + { PROPERTIES_FILE }, + { PROXY_USER }, + { PY_FILES }, + { QUEUE }, + { REPOSITORIES }, + { STATUS }, + { TOTAL_EXECUTOR_CORES }, + }; + + /** + * List of switches (command line options that do not take parameters) recognized by spark-submit. + */ + final String[][] switches = { + { HELP, "-h" }, + { SUPERVISE }, + { VERBOSE, "-v" }, + { VERSION }, + }; + + /** + * Parse a list of spark-submit command line options. + * <p/> + * See SparkSubmitArguments.scala for a more formal description of available options. + * + * @throws IllegalArgumentException If an error is found during parsing. + */ + protected final void parse(List<String> args) { + Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)"); + + int idx = 0; + for (idx = 0; idx < args.size(); idx++) { + String arg = args.get(idx); + String value = null; + + Matcher m = eqSeparatedOpt.matcher(arg); + if (m.matches()) { + arg = m.group(1); + value = m.group(2); + } + + // Look for options with a value. + String name = findCliOption(arg, opts); + if (name != null) { + if (value == null) { + if (idx == args.size() - 1) { + throw new IllegalArgumentException( + String.format("Missing argument for option '%s'.", arg)); + } + idx++; + value = args.get(idx); + } + if (!handle(name, value)) { + break; + } + continue; + } + + // Look for a switch. + name = findCliOption(arg, switches); + if (name != null) { + if (!handle(name, null)) { + break; + } + continue; + } + + if (!handleUnknown(arg)) { + break; + } + } + + if (idx < args.size()) { + idx++; + } + handleExtraArgs(args.subList(idx, args.size())); + } + + /** + * Callback for when an option with an argument is parsed. + * + * @param opt The long name of the cli option (might differ from actual command line). + * @param value The value. This will be <i>null</i> if the option does not take a value. + * @return Whether to continue parsing the argument list. + */ + protected boolean handle(String opt, String value) { + throw new UnsupportedOperationException(); + } + + /** + * Callback for when an unrecognized option is parsed. + * + * @param opt Unrecognized option from the command line. + * @return Whether to continue parsing the argument list. + */ + protected boolean handleUnknown(String opt) { + throw new UnsupportedOperationException(); + } + + /** + * Callback for remaining command line arguments after either {@link #handle(String, String)} or + * {@link #handleUnknown(String)} return "false". This will be called at the end of parsing even + * when there are no remaining arguments. + * + * @param extra List of remaining arguments. + */ + protected void handleExtraArgs(List<String> extra) { + throw new UnsupportedOperationException(); + } + + private String findCliOption(String name, String[][] available) { + for (String[] candidates : available) { + for (String candidate : candidates) { + if (candidate.equals(name)) { + return candidates[0]; + } + } + } + return null; + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java new file mode 100644 index 0000000000..7ed756f4b8 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Library for launching Spark applications. + * <p/> + * This library allows applications to launch Spark programmatically. There's only one entry + * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class. + * <p/> + * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher} + * and configure the application to run. For example: + * + * <pre> + * {@code + * import org.apache.spark.launcher.SparkLauncher; + * + * public class MyLauncher { + * public static void main(String[] args) throws Exception { + * Process spark = new SparkLauncher() + * .setAppResource("/my/app.jar") + * .setMainClass("my.spark.app.Main") + * .setMaster("local") + * .setConf(SparkLauncher.DRIVER_MEMORY, "2g") + * .launch(); + * spark.waitFor(); + * } + * } + * } + * </pre> + */ +package org.apache.spark.launcher; |