diff options
Diffstat (limited to 'core/src/main')
6 files changed, 148 insertions, 301 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 82e66a3742..94e4bdbfb7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -18,18 +18,22 @@ package org.apache.spark.deploy import java.net.URI +import java.util.{List => JList} import java.util.jar.JarFile +import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.deploy.SparkSubmitAction._ +import org.apache.spark.launcher.SparkSubmitArgumentsParser import org.apache.spark.util.Utils /** * Parses and encapsulates arguments from the spark-submit script. * The env argument is used for testing. */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { +private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) + extends SparkSubmitArgumentsParser { var master: String = null var deployMode: String = null var executorMemory: String = null @@ -84,7 +88,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St } // Set parameters from command line arguments - parseOpts(args.toList) + try { + parse(args.toList) + } catch { + case e: IllegalArgumentException => + SparkSubmit.printErrorAndExit(e.getMessage()) + } // Populate `sparkProperties` map from properties file mergeDefaultSparkProperties() // Use `sparkProperties` map along with env vars to fill in any missing parameters @@ -277,167 +286,139 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St """.stripMargin } - /** - * Fill in values by parsing user options. - * NOTE: Any changes here must be reflected in YarnClientSchedulerBackend. - */ - private def parseOpts(opts: Seq[String]): Unit = { - val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r - - // Delineates parsing of Spark options from parsing of user options. - parse(opts) - - /** - * NOTE: If you add or remove spark-submit options, - * modify NOT ONLY this file but also utils.sh - */ - def parse(opts: Seq[String]): Unit = opts match { - case ("--name") :: value :: tail => + /** Fill in values by parsing user options. */ + override protected def handle(opt: String, value: String): Boolean = { + opt match { + case NAME => name = value - parse(tail) - case ("--master") :: value :: tail => + case MASTER => master = value - parse(tail) - case ("--class") :: value :: tail => + case CLASS => mainClass = value - parse(tail) - case ("--deploy-mode") :: value :: tail => + case DEPLOY_MODE => if (value != "client" && value != "cluster") { SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"") } deployMode = value - parse(tail) - case ("--num-executors") :: value :: tail => + case NUM_EXECUTORS => numExecutors = value - parse(tail) - case ("--total-executor-cores") :: value :: tail => + case TOTAL_EXECUTOR_CORES => totalExecutorCores = value - parse(tail) - case ("--executor-cores") :: value :: tail => + case EXECUTOR_CORES => executorCores = value - parse(tail) - case ("--executor-memory") :: value :: tail => + case EXECUTOR_MEMORY => executorMemory = value - parse(tail) - case ("--driver-memory") :: value :: tail => + case DRIVER_MEMORY => driverMemory = value - parse(tail) - case ("--driver-cores") :: value :: tail => + case DRIVER_CORES => driverCores = value - parse(tail) - case ("--driver-class-path") :: value :: tail => + case DRIVER_CLASS_PATH => driverExtraClassPath = value - parse(tail) - case ("--driver-java-options") :: value :: tail => + case DRIVER_JAVA_OPTIONS => driverExtraJavaOptions = value - parse(tail) - case ("--driver-library-path") :: value :: tail => + case DRIVER_LIBRARY_PATH => driverExtraLibraryPath = value - parse(tail) - case ("--properties-file") :: value :: tail => + case PROPERTIES_FILE => propertiesFile = value - parse(tail) - case ("--kill") :: value :: tail => + case KILL_SUBMISSION => submissionToKill = value if (action != null) { SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.") } action = KILL - parse(tail) - case ("--status") :: value :: tail => + case STATUS => submissionToRequestStatusFor = value if (action != null) { SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.") } action = REQUEST_STATUS - parse(tail) - case ("--supervise") :: tail => + case SUPERVISE => supervise = true - parse(tail) - case ("--queue") :: value :: tail => + case QUEUE => queue = value - parse(tail) - case ("--files") :: value :: tail => + case FILES => files = Utils.resolveURIs(value) - parse(tail) - case ("--py-files") :: value :: tail => + case PY_FILES => pyFiles = Utils.resolveURIs(value) - parse(tail) - case ("--archives") :: value :: tail => + case ARCHIVES => archives = Utils.resolveURIs(value) - parse(tail) - case ("--jars") :: value :: tail => + case JARS => jars = Utils.resolveURIs(value) - parse(tail) - case ("--packages") :: value :: tail => + case PACKAGES => packages = value - parse(tail) - case ("--repositories") :: value :: tail => + case REPOSITORIES => repositories = value - parse(tail) - case ("--conf" | "-c") :: value :: tail => + case CONF => value.split("=", 2).toSeq match { case Seq(k, v) => sparkProperties(k) = v case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value") } - parse(tail) - case ("--proxy-user") :: value :: tail => + case PROXY_USER => proxyUser = value - parse(tail) - case ("--help" | "-h") :: tail => + case HELP => printUsageAndExit(0) - case ("--verbose" | "-v") :: tail => + case VERBOSE => verbose = true - parse(tail) - case ("--version") :: tail => + case VERSION => SparkSubmit.printVersionAndExit() - case EQ_SEPARATED_OPT(opt, value) :: tail => - parse(opt :: value :: tail) + case _ => + throw new IllegalArgumentException(s"Unexpected argument '$opt'.") + } + true + } - case value :: tail if value.startsWith("-") => - SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.") + /** + * Handle unrecognized command line options. + * + * The first unrecognized option is treated as the "primary resource". Everything else is + * treated as application arguments. + */ + override protected def handleUnknown(opt: String): Boolean = { + if (opt.startsWith("-")) { + SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.") + } - case value :: tail => - primaryResource = - if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) { - Utils.resolveURI(value).toString - } else { - value - } - isPython = SparkSubmit.isPython(value) - childArgs ++= tail + primaryResource = + if (!SparkSubmit.isShell(opt) && !SparkSubmit.isInternal(opt)) { + Utils.resolveURI(opt).toString + } else { + opt + } + isPython = SparkSubmit.isPython(opt) + false + } - case Nil => - } + override protected def handleExtraArgs(extra: JList[String]): Unit = { + childArgs ++= extra } private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala deleted file mode 100644 index 311048cdaa..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy - -import scala.collection.JavaConversions._ - -import org.apache.spark.util.{RedirectThread, Utils} - -/** - * Launch an application through Spark submit in client mode with the appropriate classpath, - * library paths, java options and memory. These properties of the JVM must be set before the - * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity - * of parsing the properties file for such relevant configs in Bash. - * - * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args> - */ -private[spark] object SparkSubmitDriverBootstrapper { - - // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`. - // Any changes made there must be reflected in this file. - - def main(args: Array[String]): Unit = { - - // This should be called only from `bin/spark-class` - if (!sys.env.contains("SPARK_CLASS")) { - System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!") - System.exit(1) - } - - val submitArgs = args - val runner = sys.env("RUNNER") - val classpath = sys.env("CLASSPATH") - val javaOpts = sys.env("JAVA_OPTS") - val defaultDriverMemory = sys.env("OUR_JAVA_MEM") - - // Spark submit specific environment variables - val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE") - val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE") - val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER") - val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY") - val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH") - val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH") - val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS") - - assume(runner != null, "RUNNER must be set") - assume(classpath != null, "CLASSPATH must be set") - assume(javaOpts != null, "JAVA_OPTS must be set") - assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set") - assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!") - assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set") - assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set") - - // Parse the properties file for the equivalent spark.driver.* configs - val properties = Utils.getPropertiesFromFile(propertiesFile) - val confDriverMemory = properties.get("spark.driver.memory") - val confLibraryPath = properties.get("spark.driver.extraLibraryPath") - val confClasspath = properties.get("spark.driver.extraClassPath") - val confJavaOpts = properties.get("spark.driver.extraJavaOptions") - - // Favor Spark submit arguments over the equivalent configs in the properties file. - // Note that we do not actually use the Spark submit values for library path, classpath, - // and Java opts here, because we have already captured them in Bash. - - val newDriverMemory = submitDriverMemory - .orElse(confDriverMemory) - .getOrElse(defaultDriverMemory) - - val newClasspath = - if (submitClasspath.isDefined) { - classpath - } else { - classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") - } - - val newJavaOpts = - if (submitJavaOpts.isDefined) { - // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS - javaOpts - } else { - javaOpts + confJavaOpts.map(" " + _).getOrElse("") - } - - val filteredJavaOpts = Utils.splitCommandString(newJavaOpts) - .filterNot(_.startsWith("-Xms")) - .filterNot(_.startsWith("-Xmx")) - - // Build up command - val command: Seq[String] = - Seq(runner) ++ - Seq("-cp", newClasspath) ++ - filteredJavaOpts ++ - Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++ - Seq("org.apache.spark.deploy.SparkSubmit") ++ - submitArgs - - // Print the launch command. This follows closely the format used in `bin/spark-class`. - if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) { - System.err.print("Spark Command: ") - System.err.println(command.mkString(" ")) - System.err.println("========================================\n") - } - - // Start the driver JVM - val filteredCommand = command.filter(_.nonEmpty) - val builder = new ProcessBuilder(filteredCommand) - val env = builder.environment() - - if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) { - val libraryPaths = confLibraryPath ++ sys.env.get(Utils.libraryPathEnvName) - env.put(Utils.libraryPathEnvName, libraryPaths.mkString(sys.props("path.separator"))) - } - - val process = builder.start() - - // If we kill an app while it's running, its sub-process should be killed too. - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { - if (process != null) { - process.destroy() - process.waitFor() - } - } - }) - - // Redirect stdout and stderr from the child JVM - val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") - val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") - stdoutThread.start() - stderrThread.start() - - // Redirect stdin to child JVM only if we're not running Windows. This is because the - // subprocess there already reads directly from our stdin, so we should avoid spawning a - // thread that contends with the subprocess in reading from System.in. - val isWindows = Utils.isWindows - val isSubprocess = sys.env.contains("IS_SUBPROCESS") - if (!isWindows) { - val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin", - propagateEof = true) - stdinThread.start() - // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on - // broken pipe, signaling that the parent process has exited. This is the case if the - // application is launched directly from python, as in the PySpark shell. In Windows, - // the termination logic is handled in java_gateway.py - if (isSubprocess) { - stdinThread.join() - process.destroy() - } - } - val returnCode = process.waitFor() - stdoutThread.join() - stderrThread.join() - sys.exit(returnCode) - } - -} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 3e013c3209..83f78cf473 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -20,10 +20,12 @@ package org.apache.spark.deploy.worker import java.io.{File, FileOutputStream, InputStream, IOException} import java.lang.System._ +import scala.collection.JavaConversions._ import scala.collection.Map import org.apache.spark.Logging import org.apache.spark.deploy.Command +import org.apache.spark.launcher.WorkerCommandBuilder import org.apache.spark.util.Utils /** @@ -54,12 +56,10 @@ object CommandUtils extends Logging { } private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { - val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java") - // SPARK-698: do not call the run.cmd script, as process.destroy() // fails to kill a process tree on Windows - Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++ - command.arguments + val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand() + cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments } /** @@ -92,44 +92,6 @@ object CommandUtils extends Logging { command.javaOpts) } - /** - * Attention: this must always be aligned with the environment variables in the run scripts and - * the way the JAVA_OPTS are assembled there. - */ - private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = { - val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") - - // Exists for backwards compatibility with older Spark versions - val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString) - .getOrElse(Nil) - if (workerLocalOpts.length > 0) { - logWarning("SPARK_JAVA_OPTS was set on the worker. It is deprecated in Spark 1.0.") - logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.") - } - - // Figure out our classpath with the external compute-classpath script - val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" - val classPath = Utils.executeAndGetOutput( - Seq(sparkHome + "/bin/compute-classpath" + ext), - extraEnvironment = command.environment) - val userClassPath = command.classPathEntries ++ Seq(classPath) - - val javaVersion = System.getProperty("java.version") - - val javaOpts = workerLocalOpts ++ command.javaOpts - - val permGenOpt = - if (!javaVersion.startsWith("1.8") && !javaOpts.exists(_.startsWith("-XX:MaxPermSize="))) { - // do not specify -XX:MaxPermSize if it was already specified by user - Some("-XX:MaxPermSize=128m") - } else { - None - } - - Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++ - permGenOpt ++ javaOpts ++ memoryOpts - } - /** Spawn a thread that will redirect a given stream to a file */ def redirectStream(in: InputStream, file: File) { val out = new FileOutputStream(file, true) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bed0a08d4d..a897e53218 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -49,7 +49,6 @@ private[spark] class Executor( isLocal: Boolean = false) extends Logging { - logInfo(s"Starting executor ID $executorId on host $executorHostname") // Application dependencies (added through SparkContext) that we've fetched so far on this node. diff --git a/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala b/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala new file mode 100644 index 0000000000..a835012531 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher + +/** + * This class makes SparkSubmitOptionParser visible for Spark code outside of the `launcher` + * package, since Java doesn't have a feature similar to `private[spark]`, and we don't want + * that class to be public. + */ +private[spark] abstract class SparkSubmitArgumentsParser extends SparkSubmitOptionParser diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala new file mode 100644 index 0000000000..9be98723ae --- /dev/null +++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher + +import java.io.File +import java.util.{HashMap => JHashMap, List => JList, Map => JMap} + +import scala.collection.JavaConversions._ + +import org.apache.spark.deploy.Command + +/** + * This class is used by CommandUtils. It uses some package-private APIs in SparkLauncher, and since + * Java doesn't have a feature similar to `private[spark]`, and we don't want that class to be + * public, needs to live in the same package as the rest of the library. + */ +private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, command: Command) + extends AbstractCommandBuilder { + + childEnv.putAll(command.environment) + childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sparkHome) + + override def buildCommand(env: JMap[String, String]): JList[String] = { + val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator)) + cmd.add(s"-Xms${memoryMb}M") + cmd.add(s"-Xmx${memoryMb}M") + command.javaOpts.foreach(cmd.add) + addPermGenSizeOpt(cmd) + addOptionString(cmd, getenv("SPARK_JAVA_OPTS")) + cmd + } + + def buildCommand(): JList[String] = buildCommand(new JHashMap[String, String]()) + +} |