aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-03-31 12:07:14 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-31 12:07:14 -0700
commit841721e03cc44ee7d8fe72c882db8c0f9f3af365 (patch)
tree27565f026eb4700822b66471d062c0d586f1d2a6 /core
parentd666053679ded5a32088c1758d20101126e23af6 (diff)
downloadspark-841721e03cc44ee7d8fe72c882db8c0f9f3af365.tar.gz
spark-841721e03cc44ee7d8fe72c882db8c0f9f3af365.tar.bz2
spark-841721e03cc44ee7d8fe72c882db8c0f9f3af365.zip
SPARK-1352: Improve robustness of spark-submit script
1. Better error messages when required arguments are missing. 2. Support for unit testing cases where presented arguments are invalid. 3. Bug fix: Only use environment varaibles when they are set (otherwise will cause NPE). 4. A verbose mode to aid debugging. 5. Visibility of several variables is set to private. 6. Deprecation warning for existing scripts. Author: Patrick Wendell <pwendell@gmail.com> Closes #271 from pwendell/spark-submit and squashes the following commits: 9146def [Patrick Wendell] SPARK-1352: Improve robustness of spark-submit script
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala74
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala61
4 files changed, 157 insertions, 48 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d9e3035e1a..8fd2c7e95b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
*/
object Client {
def main(args: Array[String]) {
+ println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
+ println("Use ./bin/spark-submit with \"--master spark://host:port\"")
+
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 24a9c98e18..1fa7991904 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.File
+import java.io.{PrintStream, File}
import java.net.URL
import org.apache.spark.executor.ExecutorURLClassLoader
@@ -32,38 +32,51 @@ import scala.collection.mutable.Map
* modes that Spark supports.
*/
object SparkSubmit {
- val YARN = 1
- val STANDALONE = 2
- val MESOS = 4
- val LOCAL = 8
- val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
+ private val YARN = 1
+ private val STANDALONE = 2
+ private val MESOS = 4
+ private val LOCAL = 8
+ private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
- var clusterManager: Int = LOCAL
+ private var clusterManager: Int = LOCAL
def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
+ if (appArgs.verbose) {
+ printStream.println(appArgs)
+ }
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
- launch(childArgs, classpath, sysProps, mainClass)
+ launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}
+ // Exposed for testing
+ private[spark] var printStream: PrintStream = System.err
+ private[spark] var exitFn: () => Unit = () => System.exit(-1)
+
+ private[spark] def printErrorAndExit(str: String) = {
+ printStream.println("error: " + str)
+ printStream.println("run with --help for more information or --verbose for debugging output")
+ exitFn()
+ }
+ private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
+
/**
* @return
* a tuple containing the arguments for the child, a list of classpath
* entries for the child, and the main class for the child
*/
- def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+ private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
- if (appArgs.master.startsWith("yarn")) {
+ if (appArgs.master.startsWith("local")) {
+ clusterManager = LOCAL
+ } else if (appArgs.master.startsWith("yarn")) {
clusterManager = YARN
} else if (appArgs.master.startsWith("spark")) {
clusterManager = STANDALONE
} else if (appArgs.master.startsWith("mesos")) {
clusterManager = MESOS
- } else if (appArgs.master.startsWith("local")) {
- clusterManager = LOCAL
} else {
- System.err.println("master must start with yarn, mesos, spark, or local")
- System.exit(1)
+ printErrorAndExit("master must start with yarn, mesos, spark, or local")
}
// Because "yarn-standalone" and "yarn-client" encapsulate both the master
@@ -73,12 +86,10 @@ object SparkSubmit {
appArgs.deployMode = "cluster"
}
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
- System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
- System.exit(1)
+ printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
- System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
- System.exit(1)
+ printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
}
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-standalone"
@@ -95,8 +106,7 @@ object SparkSubmit {
var childMainClass = ""
if (clusterManager == MESOS && deployOnCluster) {
- System.err.println("Mesos does not support running the driver on the cluster")
- System.exit(1)
+ printErrorAndExit("Mesos does not support running the driver on the cluster")
}
if (!deployOnCluster) {
@@ -174,8 +184,17 @@ object SparkSubmit {
(childArgs, childClasspath, sysProps, childMainClass)
}
- def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
- sysProps: Map[String, String], childMainClass: String) {
+ private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
+ sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
+
+ if (verbose) {
+ System.err.println(s"Main class:\n$childMainClass")
+ System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
+ System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
+ System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
+ System.err.println("\n")
+ }
+
val loader = new ExecutorURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
@@ -193,10 +212,10 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
}
- def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
+ private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
if (!localJarFile.exists()) {
- System.err.println("Jar does not exist: " + localJar + ". Skipping.")
+ printWarning(s"Jar $localJar does not exist, skipping.")
}
val url = localJarFile.getAbsoluteFile.toURI.toURL
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index ff2aa68908..9c8f54ea6f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
+ var verbose: Boolean = false
loadEnvVars()
- parseArgs(args.toList)
-
- def loadEnvVars() {
- master = System.getenv("MASTER")
- deployMode = System.getenv("DEPLOY_MODE")
+ parseOpts(args.toList)
+
+ // Sanity checks
+ if (args.length == 0) printUsageAndExit(-1)
+ if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
+ if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+
+ override def toString = {
+ s"""Parsed arguments:
+ | master $master
+ | deployMode $deployMode
+ | executorMemory $executorMemory
+ | executorCores $executorCores
+ | totalExecutorCores $totalExecutorCores
+ | driverMemory $driverMemory
+ | drivercores $driverCores
+ | supervise $supervise
+ | queue $queue
+ | numExecutors $numExecutors
+ | files $files
+ | archives $archives
+ | mainClass $mainClass
+ | primaryResource $primaryResource
+ | name $name
+ | childArgs [${childArgs.mkString(" ")}]
+ | jars $jars
+ | verbose $verbose
+ """.stripMargin
}
- def parseArgs(args: List[String]) {
- if (args.size == 0) {
- printUsageAndExit(1)
- System.exit(1)
- }
- primaryResource = args(0)
- parseOpts(args.tail)
+ private def loadEnvVars() {
+ Option(System.getenv("MASTER")).map(master = _)
+ Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
}
- def parseOpts(opts: List[String]): Unit = opts match {
+ private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parseOpts(tail)
@@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
- System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
- System.exit(1)
+ SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parseOpts(tail)
@@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)
- case Nil =>
+ case ("--verbose" | "-v") :: tail =>
+ verbose = true
+ parseOpts(tail)
- case _ =>
- printUsageAndExit(1, opts)
+ case value :: tail =>
+ if (primaryResource != null) {
+ val error = s"Found two conflicting resources, $value and $primaryResource." +
+ " Expecting only one resource."
+ SparkSubmit.printErrorAndExit(error)
+ }
+ primaryResource = value
+ parseOpts(tail)
+
+ case Nil =>
}
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ val outStream = SparkSubmit.printStream
if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
+ outStream.println("Unknown/unsupported param " + unknownParam)
}
- System.err.println(
+ outStream.println(
"""Usage: spark-submit <primary binary> [options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
@@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working dir of each executor.""".stripMargin
)
- System.exit(exitCode)
+ SparkSubmit.exitFn()
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 29fef2ed8c..4e489cd9b6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -17,14 +17,71 @@
package org.apache.spark.deploy
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
+
import org.apache.spark.deploy.SparkSubmit._
+
class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+
+ val noOpOutputStream = new OutputStream {
+ def write(b: Int) = {}
+ }
+
+ /** Simple PrintStream that reads data into a buffer */
+ class BufferPrintStream extends PrintStream(noOpOutputStream) {
+ var lineBuffer = ArrayBuffer[String]()
+ override def println(line: String) {
+ lineBuffer += line
+ }
+ }
+
+ /** Returns true if the script exits and the given search string is printed. */
+ def testPrematureExit(input: Array[String], searchString: String): Boolean = {
+ val printStream = new BufferPrintStream()
+ SparkSubmit.printStream = printStream
+
+ @volatile var exitedCleanly = false
+ SparkSubmit.exitFn = () => exitedCleanly = true
+
+ val thread = new Thread {
+ override def run() = try {
+ SparkSubmit.main(input)
+ } catch {
+ // If exceptions occur after the "exit" has happened, fine to ignore them.
+ // These represent code paths not reachable during normal execution.
+ case e: Exception => if (!exitedCleanly) throw e
+ }
+ }
+ thread.start()
+ thread.join()
+ printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
+ }
+
test("prints usage on empty input") {
- val clArgs = Array[String]()
- // val appArgs = new SparkSubmitArguments(clArgs)
+ testPrematureExit(Array[String](), "Usage: spark-submit") should be (true)
+ }
+
+ test("prints usage with only --help") {
+ testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
+ }
+
+ test("handles multiple binary definitions") {
+ val adjacentJars = Array("foo.jar", "bar.jar")
+ testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)
+
+ val nonAdjacentJars =
+ Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
+ testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true)
+ }
+
+ test("handle binary specified but not class") {
+ testPrematureExit(Array("foo.jar"), "must specify a main class")
}
test("handles YARN cluster mode") {