From 841721e03cc44ee7d8fe72c882db8c0f9f3af365 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 31 Mar 2014 12:07:14 -0700 Subject: SPARK-1352: Improve robustness of spark-submit script 1. Better error messages when required arguments are missing. 2. Support for unit testing cases where presented arguments are invalid. 3. Bug fix: Only use environment varaibles when they are set (otherwise will cause NPE). 4. A verbose mode to aid debugging. 5. Visibility of several variables is set to private. 6. Deprecation warning for existing scripts. Author: Patrick Wendell Closes #271 from pwendell/spark-submit and squashes the following commits: 9146def [Patrick Wendell] SPARK-1352: Improve robustness of spark-submit script --- .../scala/org/apache/spark/deploy/Client.scala | 3 + .../org/apache/spark/deploy/SparkSubmit.scala | 67 +++++++++++++------- .../apache/spark/deploy/SparkSubmitArguments.scala | 74 +++++++++++++++------- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 61 +++++++++++++++++- 4 files changed, 157 insertions(+), 48 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index d9e3035e1a..8fd2c7e95b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends */ object Client { def main(args: Array[String]) { + println("WARNING: This client is deprecated and will be removed in a future version of Spark.") + println("Use ./bin/spark-submit with \"--master spark://host:port\"") + val conf = new SparkConf() val driverArgs = new ClientArguments(args) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 24a9c98e18..1fa7991904 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.File +import java.io.{PrintStream, File} import java.net.URL import org.apache.spark.executor.ExecutorURLClassLoader @@ -32,38 +32,51 @@ import scala.collection.mutable.Map * modes that Spark supports. */ object SparkSubmit { - val YARN = 1 - val STANDALONE = 2 - val MESOS = 4 - val LOCAL = 8 - val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL + private val YARN = 1 + private val STANDALONE = 2 + private val MESOS = 4 + private val LOCAL = 8 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL - var clusterManager: Int = LOCAL + private var clusterManager: Int = LOCAL def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) + if (appArgs.verbose) { + printStream.println(appArgs) + } val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) - launch(childArgs, classpath, sysProps, mainClass) + launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) } + // Exposed for testing + private[spark] var printStream: PrintStream = System.err + private[spark] var exitFn: () => Unit = () => System.exit(-1) + + private[spark] def printErrorAndExit(str: String) = { + printStream.println("error: " + str) + printStream.println("run with --help for more information or --verbose for debugging output") + exitFn() + } + private[spark] def printWarning(str: String) = printStream.println("warning: " + str) + /** * @return * a tuple containing the arguments for the child, a list of classpath * entries for the child, and the main class for the child */ - def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String], + private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { - if (appArgs.master.startsWith("yarn")) { + if (appArgs.master.startsWith("local")) { + clusterManager = LOCAL + } else if (appArgs.master.startsWith("yarn")) { clusterManager = YARN } else if (appArgs.master.startsWith("spark")) { clusterManager = STANDALONE } else if (appArgs.master.startsWith("mesos")) { clusterManager = MESOS - } else if (appArgs.master.startsWith("local")) { - clusterManager = LOCAL } else { - System.err.println("master must start with yarn, mesos, spark, or local") - System.exit(1) + printErrorAndExit("master must start with yarn, mesos, spark, or local") } // Because "yarn-standalone" and "yarn-client" encapsulate both the master @@ -73,12 +86,10 @@ object SparkSubmit { appArgs.deployMode = "cluster" } if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") { - System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds") - System.exit(1) + printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible") } if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") { - System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds") - System.exit(1) + printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible") } if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) { appArgs.master = "yarn-standalone" @@ -95,8 +106,7 @@ object SparkSubmit { var childMainClass = "" if (clusterManager == MESOS && deployOnCluster) { - System.err.println("Mesos does not support running the driver on the cluster") - System.exit(1) + printErrorAndExit("Mesos does not support running the driver on the cluster") } if (!deployOnCluster) { @@ -174,8 +184,17 @@ object SparkSubmit { (childArgs, childClasspath, sysProps, childMainClass) } - def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], childMainClass: String) { + private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String], + sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) { + + if (verbose) { + System.err.println(s"Main class:\n$childMainClass") + System.err.println(s"Arguments:\n${childArgs.mkString("\n")}") + System.err.println(s"System properties:\n${sysProps.mkString("\n")}") + System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") + System.err.println("\n") + } + val loader = new ExecutorURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) Thread.currentThread.setContextClassLoader(loader) @@ -193,10 +212,10 @@ object SparkSubmit { mainMethod.invoke(null, childArgs.toArray) } - def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { + private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { val localJarFile = new File(localJar) if (!localJarFile.exists()) { - System.err.println("Jar does not exist: " + localJar + ". Skipping.") + printWarning(s"Jar $localJar does not exist, skipping.") } val url = localJarFile.getAbsoluteFile.toURI.toURL diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index ff2aa68908..9c8f54ea6f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { var name: String = null var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null + var verbose: Boolean = false loadEnvVars() - parseArgs(args.toList) - - def loadEnvVars() { - master = System.getenv("MASTER") - deployMode = System.getenv("DEPLOY_MODE") + parseOpts(args.toList) + + // Sanity checks + if (args.length == 0) printUsageAndExit(-1) + if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource") + if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class") + + override def toString = { + s"""Parsed arguments: + | master $master + | deployMode $deployMode + | executorMemory $executorMemory + | executorCores $executorCores + | totalExecutorCores $totalExecutorCores + | driverMemory $driverMemory + | drivercores $driverCores + | supervise $supervise + | queue $queue + | numExecutors $numExecutors + | files $files + | archives $archives + | mainClass $mainClass + | primaryResource $primaryResource + | name $name + | childArgs [${childArgs.mkString(" ")}] + | jars $jars + | verbose $verbose + """.stripMargin } - def parseArgs(args: List[String]) { - if (args.size == 0) { - printUsageAndExit(1) - System.exit(1) - } - primaryResource = args(0) - parseOpts(args.tail) + private def loadEnvVars() { + Option(System.getenv("MASTER")).map(master = _) + Option(System.getenv("DEPLOY_MODE")).map(deployMode = _) } - def parseOpts(opts: List[String]): Unit = opts match { + private def parseOpts(opts: List[String]): Unit = opts match { case ("--name") :: value :: tail => name = value parseOpts(tail) @@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { case ("--deploy-mode") :: value :: tail => if (value != "client" && value != "cluster") { - System.err.println("--deploy-mode must be either \"client\" or \"cluster\"") - System.exit(1) + SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"") } deployMode = value parseOpts(tail) @@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { case ("--help" | "-h") :: tail => printUsageAndExit(0) - case Nil => + case ("--verbose" | "-v") :: tail => + verbose = true + parseOpts(tail) - case _ => - printUsageAndExit(1, opts) + case value :: tail => + if (primaryResource != null) { + val error = s"Found two conflicting resources, $value and $primaryResource." + + " Expecting only one resource." + SparkSubmit.printErrorAndExit(error) + } + primaryResource = value + parseOpts(tail) + + case Nil => } - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { + private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { + val outStream = SparkSubmit.printStream if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) + outStream.println("Unknown/unsupported param " + unknownParam) } - System.err.println( + outStream.println( """Usage: spark-submit [options] |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. @@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | --archives ARCHIVES Comma separated list of archives to be extracted into the | working dir of each executor.""".stripMargin ) - System.exit(exitCode) + SparkSubmit.exitFn() } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 29fef2ed8c..4e489cd9b6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -17,14 +17,71 @@ package org.apache.spark.deploy +import java.io.{OutputStream, PrintStream} + +import scala.collection.mutable.ArrayBuffer + import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers + import org.apache.spark.deploy.SparkSubmit._ + class SparkSubmitSuite extends FunSuite with ShouldMatchers { + + val noOpOutputStream = new OutputStream { + def write(b: Int) = {} + } + + /** Simple PrintStream that reads data into a buffer */ + class BufferPrintStream extends PrintStream(noOpOutputStream) { + var lineBuffer = ArrayBuffer[String]() + override def println(line: String) { + lineBuffer += line + } + } + + /** Returns true if the script exits and the given search string is printed. */ + def testPrematureExit(input: Array[String], searchString: String): Boolean = { + val printStream = new BufferPrintStream() + SparkSubmit.printStream = printStream + + @volatile var exitedCleanly = false + SparkSubmit.exitFn = () => exitedCleanly = true + + val thread = new Thread { + override def run() = try { + SparkSubmit.main(input) + } catch { + // If exceptions occur after the "exit" has happened, fine to ignore them. + // These represent code paths not reachable during normal execution. + case e: Exception => if (!exitedCleanly) throw e + } + } + thread.start() + thread.join() + printStream.lineBuffer.find(s => s.contains(searchString)).size > 0 + } + test("prints usage on empty input") { - val clArgs = Array[String]() - // val appArgs = new SparkSubmitArguments(clArgs) + testPrematureExit(Array[String](), "Usage: spark-submit") should be (true) + } + + test("prints usage with only --help") { + testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true) + } + + test("handles multiple binary definitions") { + val adjacentJars = Array("foo.jar", "bar.jar") + testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true) + + val nonAdjacentJars = + Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar") + testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true) + } + + test("handle binary specified but not class") { + testPrematureExit(Array("foo.jar"), "must specify a main class") } test("handles YARN cluster mode") { -- cgit v1.2.3