diff options
Diffstat (limited to 'core/src')
18 files changed, 479 insertions, 114 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b947feb891..bd21fdc5a1 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -208,6 +208,82 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { new SparkConf(false).setAll(settings) } + /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not + * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */ + private[spark] def validateSettings() { + if (settings.contains("spark.local.dir")) { + val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " + + "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)." + logWarning(msg) + } + + val executorOptsKey = "spark.executor.extraJavaOptions" + val executorClasspathKey = "spark.executor.extraClassPath" + val driverOptsKey = "spark.driver.extraJavaOptions" + val driverClassPathKey = "spark.driver.extraClassPath" + + // Validate spark.executor.extraJavaOptions + settings.get(executorOptsKey).map { javaOpts => + if (javaOpts.contains("-Dspark")) { + val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts)'. " + + "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit." + throw new Exception(msg) + } + if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) { + val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). " + + "Use spark.executor.memory instead." + throw new Exception(msg) + } + } + + // Check for legacy configs + sys.env.get("SPARK_JAVA_OPTS").foreach { value => + val error = + s""" + |SPARK_JAVA_OPTS was detected (set to '$value'). + |This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+. + | + |Please instead use: + | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application + | - ./spark-submit with --driver-java-options to set -X options for a driver + | - spark.executor.extraJavaOptions to set -X options for executors + | - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker) + """.stripMargin + logError(error) + + for (key <- Seq(executorOptsKey, driverOptsKey)) { + if (getOption(key).isDefined) { + throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.") + } else { + logWarning(s"Setting '$key' to '$value' as a work-around.") + set(key, value) + } + } + } + + sys.env.get("SPARK_CLASSPATH").foreach { value => + val error = + s""" + |SPARK_CLASSPATH was detected (set to '$value'). + | This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+. + | + |Please instead use: + | - ./spark-submit with --driver-class-path to augment the driver classpath + | - spark.executor.extraClassPath to augment the executor classpath + """.stripMargin + logError(error) + + for (key <- Seq(executorClasspathKey, driverClassPathKey)) { + if (getOption(key).isDefined) { + throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.") + } else { + logWarning(s"Setting '$key' to '$value' as a work-around.") + set(key, value) + } + } + } + } + /** * Return a string listing all keys and values, one per line. This is useful to print the * configuration out for debugging. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d3ef75bc73..7933d68d67 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -148,6 +148,7 @@ class SparkContext(config: SparkConf) extends Logging { this(master, appName, sparkHome, jars, Map(), Map()) private[spark] val conf = config.clone() + conf.validateSettings() /** * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be @@ -159,7 +160,7 @@ class SparkContext(config: SparkConf) extends Logging { throw new SparkException("A master URL must be set in your configuration") } if (!conf.contains("spark.app.name")) { - throw new SparkException("An application must be set in your configuration") + throw new SparkException("An application name must be set in your configuration") } if (conf.getBoolean("spark.logConf", false)) { @@ -170,11 +171,11 @@ class SparkContext(config: SparkConf) extends Logging { conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0") - val jars: Seq[String] = if (conf.contains("spark.jars")) { - conf.get("spark.jars").split(",").filter(_.size != 0) - } else { - null - } + val jars: Seq[String] = + conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten + + val files: Seq[String] = + conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten val master = conf.get("spark.master") val appName = conf.get("spark.app.name") @@ -235,6 +236,10 @@ class SparkContext(config: SparkConf) extends Logging { jars.foreach(addJar) } + if (files != null) { + files.foreach(addFile) + } + private def warnSparkMem(value: String): String = { logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + "deprecated, please use spark.executor.memory instead.") @@ -247,22 +252,20 @@ class SparkContext(config: SparkConf) extends Logging { .map(Utils.memoryStringToMb) .getOrElse(512) - // Environment variables to pass to our executors - private[spark] val executorEnvs = HashMap[String, String]() - for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS"); - value <- Option(System.getenv(key))) { - executorEnvs(key) = value - } + // Environment variables to pass to our executors. + // NOTE: This should only be used for test related settings. + private[spark] val testExecutorEnvs = HashMap[String, String]() + // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. - for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing")) + for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { - executorEnvs(envKey) = value + testExecutorEnvs(envKey) = value } // The Mesos scheduler backend relies on this environment variable to set executor memory. // TODO: Set this only in the Mesos scheduler. - executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" - executorEnvs ++= conf.getExecutorEnv + testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" + testExecutorEnvs ++= conf.getExecutorEnv // Set SPARK_USER for user who is running SparkContext. val sparkUser = Option { @@ -270,7 +273,7 @@ class SparkContext(config: SparkConf) extends Logging { }.getOrElse { SparkContext.SPARK_UNKNOWN_USER } - executorEnvs("SPARK_USER") = sparkUser + testExecutorEnvs("SPARK_USER") = sparkUser // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 8fd2c7e95b..7ead117152 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -54,8 +54,21 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends System.getenv().foreach{case (k, v) => env(k) = v} val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" + + val classPathConf = "spark.driver.extraClassPath" + val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => + cp.split(java.io.File.pathSeparator) + } + + val libraryPathConf = "spark.driver.extraLibraryPath" + val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => + cp.split(java.io.File.pathSeparator) + } + + val javaOptionsConf = "spark.driver.extraJavaOptions" + val javaOpts = sys.props.get(javaOptionsConf) val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ - driverArgs.driverOptions, env) + driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription( driverArgs.jarUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala index fa8af9a646..32f3ba3850 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Command.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala @@ -22,5 +22,8 @@ import scala.collection.Map private[spark] case class Command( mainClass: String, arguments: Seq[String], - environment: Map[String, String]) { + environment: Map[String, String], + classPathEntries: Seq[String], + libraryPathEntries: Seq[String], + extraJavaOptions: Option[String] = None) { } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e5d593cade..1b1e0fce0e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,14 +17,12 @@ package org.apache.spark.deploy -import java.io.{PrintStream, File} +import java.io.{File, PrintStream} import java.net.{URI, URL} -import org.apache.spark.executor.ExecutorURLClassLoader +import scala.collection.mutable.{ArrayBuffer, HashMap, Map} -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map +import org.apache.spark.executor.ExecutorURLClassLoader /** * Scala code behind the spark-submit script. The script handles setting up the classpath with @@ -63,7 +61,8 @@ object SparkSubmit { /** * @return * a tuple containing the arguments for the child, a list of classpath - * entries for the child, and the main class for the child + * entries for the child, a list of system propertes, a list of env vars + * and the main class for the child */ private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { @@ -123,6 +122,12 @@ object SparkSubmit { val options = List[OptionAssigner]( new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), + new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true, + sysProp = "spark.driver.extraClassPath"), + new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true, + sysProp = "spark.driver.extraJavaOptions"), + new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true, + sysProp = "spark.driver.extraLibraryPath"), new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"), new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"), new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"), @@ -142,10 +147,14 @@ object SparkSubmit { new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"), new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"), - new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars") + new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"), + new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), + new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), + new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, + sysProp = "spark.app.name") ) - // more jars + // For client mode make any added jars immediately visible on the classpath if (appArgs.jars != null && !deployOnCluster) { for (jar <- appArgs.jars.split(",")) { childClasspath += jar @@ -163,6 +172,14 @@ object SparkSubmit { } } + // For standalone mode, add the application jar automatically so the user doesn't have to + // call sc.addJar. TODO: Standalone mode in the cluster + if (clusterManager == STANDALONE) { + val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(",")) + println("SPARK JARS" + sysProps.get("spark.jars")) + } + if (deployOnCluster && clusterManager == STANDALONE) { if (appArgs.supervise) { childArgs += "--supervise" @@ -173,7 +190,7 @@ object SparkSubmit { childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass) } - // args + // Arguments to be passed to user program if (appArgs.childArgs != null) { if (!deployOnCluster || clusterManager == STANDALONE) { childArgs ++= appArgs.childArgs @@ -184,6 +201,10 @@ object SparkSubmit { } } + for ((k, v) <- appArgs.getDefaultSparkProperties) { + if (!sysProps.contains(k)) sysProps(k) = v + } + (childArgs, childClasspath, sysProps, childMainClass) } @@ -191,11 +212,11 @@ object SparkSubmit { sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) { if (verbose) { - System.err.println(s"Main class:\n$childMainClass") - System.err.println(s"Arguments:\n${childArgs.mkString("\n")}") - System.err.println(s"System properties:\n${sysProps.mkString("\n")}") - System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") - System.err.println("\n") + printStream.println(s"Main class:\n$childMainClass") + printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") + printStream.println(s"System properties:\n${sysProps.mkString("\n")}") + printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") + printStream.println("\n") } val loader = new ExecutorURLClassLoader(new Array[URL](0), @@ -226,6 +247,10 @@ object SparkSubmit { } } +/** + * Provides an indirection layer for passing arguments as system properties or flags to + * the user's driver program or to downstream launcher tools. + */ private[spark] class OptionAssigner(val value: String, val clusterManager: Int, val deployOnCluster: Boolean, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 834b3df2f1..02502adfbd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -17,18 +17,28 @@ package org.apache.spark.deploy -import scala.collection.mutable.ArrayBuffer +import java.io.{File, FileInputStream, IOException} +import java.util.Properties + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, ArrayBuffer} + +import org.apache.spark.SparkException /** * Parses and encapsulates arguments from the spark-submit script. */ private[spark] class SparkSubmitArguments(args: Array[String]) { - var master: String = "local" + var master: String = null var deployMode: String = null var executorMemory: String = null var executorCores: String = null var totalExecutorCores: String = null + var propertiesFile: String = null var driverMemory: String = null + var driverExtraClassPath: String = null + var driverExtraLibraryPath: String = null + var driverExtraJavaOptions: String = null var driverCores: String = null var supervise: Boolean = false var queue: String = null @@ -42,42 +52,102 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { var jars: String = null var verbose: Boolean = false - loadEnvVars() parseOpts(args.toList) + loadDefaults() + checkRequiredArguments() + + /** Return default present in the currently defined defaults file. */ + def getDefaultSparkProperties = { + val defaultProperties = new HashMap[String, String]() + if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile") + Option(propertiesFile).foreach { filename => + val file = new File(filename) + SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) => + if (k.startsWith("spark")) { + defaultProperties(k) = v + if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") + } + else { + SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v") + } + } + } + defaultProperties + } + + /** Fill in any undefined values based on the current properties file or built-in defaults. */ + private def loadDefaults() = { + + // Use common defaults file, if not specified by user + if (propertiesFile == null) { + sys.env.get("SPARK_HOME").foreach { sparkHome => + val sep = File.separator + val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf" + val file = new File(defaultPath) + if (file.exists()) { + propertiesFile = file.getAbsolutePath + } + } + } + + val defaultProperties = getDefaultSparkProperties + // Use properties file as fallback for values which have a direct analog to + // arguments in this script. + master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull) + executorMemory = Option(executorMemory) + .getOrElse(defaultProperties.get("spark.executor.memory").orNull) + executorCores = Option(executorCores) + .getOrElse(defaultProperties.get("spark.executor.cores").orNull) + totalExecutorCores = Option(totalExecutorCores) + .getOrElse(defaultProperties.get("spark.cores.max").orNull) + name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull) + jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull) - // Sanity checks - if (args.length == 0) printUsageAndExit(-1) - if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource") - if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class") + // This supports env vars in older versions of Spark + master = Option(master).getOrElse(System.getenv("MASTER")) + deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE")) + + // Global defaults. These should be keep to minimum to avoid confusing behavior. + master = Option(master).getOrElse("local") + } + + /** Ensure that required fields exists. Call this only once all defaults are loaded. */ + private def checkRequiredArguments() = { + if (args.length == 0) printUsageAndExit(-1) + if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource") + if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class") + } override def toString = { s"""Parsed arguments: - | master $master - | deployMode $deployMode - | executorMemory $executorMemory - | executorCores $executorCores - | totalExecutorCores $totalExecutorCores - | driverMemory $driverMemory - | drivercores $driverCores - | supervise $supervise - | queue $queue - | numExecutors $numExecutors - | files $files - | archives $archives - | mainClass $mainClass - | primaryResource $primaryResource - | name $name - | childArgs [${childArgs.mkString(" ")}] - | jars $jars - | verbose $verbose + | master $master + | deployMode $deployMode + | executorMemory $executorMemory + | executorCores $executorCores + | totalExecutorCores $totalExecutorCores + | propertiesFile $propertiesFile + | driverMemory $driverMemory + | driverCores $driverCores + | driverExtraClassPath $driverExtraClassPath + | driverExtraLibraryPath $driverExtraLibraryPath + | driverExtraJavaOptions $driverExtraJavaOptions + | supervise $supervise + | queue $queue + | numExecutors $numExecutors + | files $files + | archives $archives + | mainClass $mainClass + | primaryResource $primaryResource + | name $name + | childArgs [${childArgs.mkString(" ")}] + | jars $jars + | verbose $verbose + | + |Default properties from $propertiesFile: + |${getDefaultSparkProperties.mkString(" ", "\n ", "\n")} """.stripMargin } - private def loadEnvVars() { - Option(System.getenv("MASTER")).map(master = _) - Option(System.getenv("DEPLOY_MODE")).map(deployMode = _) - } - private def parseOpts(opts: List[String]): Unit = opts match { case ("--name") :: value :: tail => name = value @@ -122,6 +192,22 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { driverCores = value parseOpts(tail) + case ("--driver-class-path") :: value :: tail => + driverExtraClassPath = value + parseOpts(tail) + + case ("--driver-java-options") :: value :: tail => + driverExtraJavaOptions = value + parseOpts(tail) + + case ("--driver-library-path") :: value :: tail => + driverExtraLibraryPath = value + parseOpts(tail) + + case ("--properties-file") :: value :: tail => + propertiesFile = value + parseOpts(tail) + case ("--supervise") :: tail => supervise = true parseOpts(tail) @@ -154,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { parseOpts(tail) case value :: tail => + if (value.startsWith("-")) { + val errMessage = s"Unrecognized option '$value'." + val suggestion: Option[String] = value match { + case v if v.startsWith("--") && v.contains("=") => + val parts = v.split("=") + Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?") + case _ => + None + } + SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse("")) + } + if (primaryResource != null) { val error = s"Found two conflicting resources, $value and $primaryResource." + " Expecting only one resource." @@ -178,11 +276,21 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | --class CLASS_NAME Name of your app's main class (required for Java apps). | --arg ARG Argument to be passed to your application's main class. This | option can be specified multiple times for multiple args. - | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M). | --name NAME The name of your application (Default: 'Spark'). | --jars JARS A comma-separated list of local jars to include on the | driver classpath and that SparkContext.addJar will work | with. Doesn't work on standalone with 'cluster' deploy mode. + | --files FILES Comma separated list of files to be placed in the working dir + | of each executor. + | --properties-file FILE Path to a file from which to load extra properties. If not + | specified, this will look for conf/spark-defaults.conf. + | + | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M). + | --driver-java-options Extra Java options to pass to the driver + | --driver-library-path Extra library path entries to pass to the driver + | --driver-class-path Extra class path entries to pass to the driver + | + | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). | | Spark standalone with cluster deploy mode only: | --driver-cores NUM Cores for driver (Default: 1). @@ -193,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | | YARN-only: | --executor-cores NUM Number of cores per executor (Default: 1). - | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default'). | --num-executors NUM Number of executors to (Default: 2). - | --files FILES Comma separated list of files to be placed in the working dir - | of each executor. | --archives ARCHIVES Comma separated list of archives to be extracted into the | working dir of each executor.""".stripMargin ) SparkSubmit.exitFn() } } + +object SparkSubmitArguments { + /** Load properties present in the given file. */ + def getPropertiesFromFile(file: File): Seq[(String, String)] = { + require(file.exists(), s"Properties file ${file.getName} does not exist") + val inputStream = new FileInputStream(file) + val properties = new Properties() + try { + properties.load(inputStream) + } catch { + case e: IOException => + val message = s"Failed when loading Spark properties file ${file.getName}" + throw new SparkException(message, e) + } + properties.stringPropertyNames().toSeq.map(k => (k, properties(k))) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index 63f166d401..888dd45e93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,8 +49,8 @@ private[spark] object TestClient { val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), - Some("dummy-spark-home"), "ignored") + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), + Seq()), Some("dummy-spark-home"), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 0c761dfc93..9103c885fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -46,21 +46,26 @@ object CommandUtils extends Logging { * the way the JAVA_OPTS are assembled there. */ def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = { - val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command) - .map(p => List("-Djava.library.path=" + p)) - .getOrElse(Nil) - val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")) - .map(Utils.splitCommandString).getOrElse(Nil) - val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil) val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") + // Note, this will coalesce multiple options into a single command component + val extraOpts = command.extraJavaOptions.toSeq + val libraryOpts = + if (command.libraryPathEntries.size > 0) { + val joined = command.libraryPathEntries.mkString(File.pathSeparator) + Seq(s"-Djava.library.path=$joined") + } else { + Seq() + } // Figure out our classpath with the external compute-classpath script val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" val classPath = Utils.executeAndGetOutput( Seq(sparkHome + "/bin/compute-classpath" + ext), extraEnvironment=command.environment) + val userClassPath = command.classPathEntries.mkString(File.pathSeparator) + val classPathWithUser = classPath + File.pathSeparator + userClassPath - Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts + Seq("-cp", classPathWithUser) ++ libraryOpts ++ extraOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index b4df1a0dd4..f918b42c83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker import java.io._ import scala.collection.JavaConversions._ -import scala.collection.mutable.Map +import scala.collection.Map import akka.actor.ActorRef import com.google.common.base.Charsets @@ -74,13 +74,17 @@ private[spark] class DriverRunner( // Make sure user application jar is on the classpath // TODO: If we add ability to submit multiple jars they should also be added here - val env = Map(driverDesc.command.environment.toSeq: _*) - env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename" - val newCommand = Command(driverDesc.command.mainClass, - driverDesc.command.arguments.map(substituteVariables), env) + val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename") + val newCommand = Command( + driverDesc.command.mainClass, + driverDesc.command.arguments.map(substituteVariables), + driverDesc.command.environment, + classPath, + driverDesc.command.libraryPathEntries, + driverDesc.command.extraJavaOptions) val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, sparkHome.getAbsolutePath) - launchDriver(command, env, driverDir, driverDesc.supervise) + launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise) } catch { case e: Exception => finalException = Some(e) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2edd921066..f94cd685e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -99,7 +99,9 @@ private[spark] class ExecutorRunner( def getCommandSeq = { val command = Command(appDesc.command.mainClass, - appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment) + appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment, + appDesc.command.classPathEntries, appDesc.command.libraryPathEntries, + appDesc.command.extraJavaOptions) CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f89b2bffd1..2bfb9c387e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -64,9 +64,10 @@ private[spark] class Executor( // to what Yarn on this system said was available. This will be used later when SparkEnv // created. if (java.lang.Boolean.valueOf( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) - { + System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) { conf.set("spark.local.dir", getYarnLocalDirs()) + } else if (sys.env.contains("SPARK_LOCAL_DIRS")) { + conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS")) } if (!isLocal) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 936e9db805..9544ca05dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -42,11 +42,20 @@ private[spark] class SparkDeploySchedulerBackend( // The endpoint for executors to talk to us val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), conf.get("spark.driver.port"), + conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") + val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + val classPathEntries = sys.props.get("spark.executor.extraClassPath").toSeq.flatMap { cp => + cp.split(java.io.File.pathSeparator) + } + val libraryPathEntries = sys.props.get("spark.executor.extraLibraryPath").toSeq.flatMap { cp => + cp.split(java.io.File.pathSeparator) + } + val command = Command( - "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) + "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.testExecutorEnvs, + classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 06b041e1fd..2cd9d6c12e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -111,7 +111,18 @@ private[spark] class CoarseMesosSchedulerBackend( def createCommand(offer: Offer, numCores: Int): CommandInfo = { val environment = Environment.newBuilder() - sc.executorEnvs.foreach { case (key, value) => + val extraClassPath = conf.getOption("spark.executor.extraClassPath") + extraClassPath.foreach { cp => + environment.addVariables( + Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) + } + val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions") + + val libraryPathOption = "spark.executor.extraLibraryPath" + val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p") + val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ") + + sc.testExecutorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() .setName(key) .setValue(value) @@ -123,20 +134,22 @@ private[spark] class CoarseMesosSchedulerBackend( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) + val uri = conf.get("spark.executor.uri", null) if (uri == null) { val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath command.setValue( - "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( - runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format( + runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head command.setValue( ("cd %s*; " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d") - .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d") + .format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue, + offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } command.build() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index dfdcafe19f..c975f31232 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -90,7 +90,7 @@ private[spark] class MesosSchedulerBackend( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) val environment = Environment.newBuilder() - sc.executorEnvs.foreach { case (key, value) => + sc.testExecutorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() .setName(key) .setValue(value) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 9f2924c23b..bfae32dae0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -87,7 +87,7 @@ class JsonProtocolSuite extends FunSuite { } def createAppDesc(): ApplicationDescription = { - val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) + val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq()) new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") } @@ -100,7 +100,7 @@ class JsonProtocolSuite extends FunSuite { def createDriverCommand() = new Command( "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"), - Map(("K1", "V1"), ("K2", "V2")) + Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo") ) def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3, @@ -133,9 +133,12 @@ class JsonProtocolSuite extends FunSuite { def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) { val Diff(c, a, d) = validateJson diff expectedJson - assert(c === JNothing, "Json changed") - assert(a === JNothing, "Json added") - assert(d === JNothing, "Json deleted") + val validatePretty = JsonMethods.pretty(validateJson) + val expectedPretty = JsonMethods.pretty(expectedJson) + val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty" + assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}") + assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}") + assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}") } } @@ -165,7 +168,7 @@ object JsonConstants { """ |{"name":"name","cores":4,"memoryperslave":1234, |"user":"%s","sparkhome":"sparkHome", - |"command":"Command(mainClass,List(arg1, arg2),Map())"} + |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"} """.format(System.getProperty("user.name", "<unknown>")).stripMargin val executorRunnerJsonStr = diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 4e489cd9b6..f82d717719 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -17,16 +17,16 @@ package org.apache.spark.deploy -import java.io.{OutputStream, PrintStream} +import java.io.{File, OutputStream, PrintStream} import scala.collection.mutable.ArrayBuffer +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException, TestUtils} +import org.apache.spark.deploy.SparkSubmit._ +import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.deploy.SparkSubmit._ - - class SparkSubmitSuite extends FunSuite with ShouldMatchers { val noOpOutputStream = new OutputStream { @@ -42,7 +42,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } /** Returns true if the script exits and the given search string is printed. */ - def testPrematureExit(input: Array[String], searchString: String): Boolean = { + def testPrematureExit(input: Array[String], searchString: String) = { val printStream = new BufferPrintStream() SparkSubmit.printStream = printStream @@ -60,28 +60,38 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } thread.start() thread.join() - printStream.lineBuffer.find(s => s.contains(searchString)).size > 0 + val joined = printStream.lineBuffer.mkString("\n") + if (!joined.contains(searchString)) { + fail(s"Search string '$searchString' not found in $joined") + } } test("prints usage on empty input") { - testPrematureExit(Array[String](), "Usage: spark-submit") should be (true) + testPrematureExit(Array[String](), "Usage: spark-submit") } test("prints usage with only --help") { - testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true) + testPrematureExit(Array("--help"), "Usage: spark-submit") + } + + test("prints error with unrecognized option") { + testPrematureExit(Array("--blarg"), "Unrecognized option '--blarg'") + testPrematureExit(Array("-bleg"), "Unrecognized option '-bleg'") + testPrematureExit(Array("--master=abc"), + "Unrecognized option '--master=abc'. Perhaps you want '--master abc'?") } test("handles multiple binary definitions") { val adjacentJars = Array("foo.jar", "bar.jar") - testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true) + testPrematureExit(adjacentJars, "error: Found two conflicting resources") val nonAdjacentJars = Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar") - testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true) + testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") } test("handle binary specified but not class") { - testPrematureExit(Array("foo.jar"), "must specify a main class") + testPrematureExit(Array("foo.jar"), "Must specify a main class") } test("handles YARN cluster mode") { @@ -140,12 +150,11 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) val childArgsStr = childArgs.mkString(" ") - print("child args: " + childArgsStr) childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true) childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2") mainClass should be ("org.apache.spark.deploy.Client") classpath should have length (0) - sysProps should have size (0) + sysProps should have size (1) // contains --jar entry } test("handles standalone client mode") { @@ -175,4 +184,80 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { sysProps("spark.executor.memory") should be ("5g") sysProps("spark.cores.max") should be ("5") } + + test("launch simple application with spark-submit") { + runSparkSubmit( + Seq("unUsed.jar", + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local")) + } + + test("spark submit includes jars passed in through --jar") { + val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) + val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB")) + val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",") + runSparkSubmit( + Seq("unUsed.jar", + "--class", JarCreationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[2,1,512]", + "--jars", jarsString)) + } + + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. + def runSparkSubmit(args: Seq[String]): String = { + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + Utils.executeAndGetOutput( + Seq("./bin/spark-submit") ++ args, + new File(sparkHome), + Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) + } +} + +object JarCreationTest { + def main(args: Array[String]) { + val conf = new SparkConf() + val sc = new SparkContext(conf) + val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x => + var foundClasses = false + try { + Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader) + Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader) + foundClasses = true + } catch { + case _: Throwable => // catch all + } + Seq(foundClasses).iterator + }.collect() + if (result.contains(false)) { + throw new Exception("Could not load user defined classes inside of executors") + } + } +} + +object SimpleApplicationTest { + def main(args: Array[String]) { + val conf = new SparkConf() + val sc = new SparkContext(conf) + + val configs = Seq("spark.master", "spark.app.name") + for (config <- configs) { + val masterValue = conf.get(config) + val executorValues = sc + .makeRDD(1 to 100, 10) + .map(x => SparkEnv.get.conf.get(config)) + .collect() + .distinct + if (executorValues.size != 1) { + throw new SparkException(s"Inconsistent values for $config: $executorValues") + } + val executorValue = executorValues(0) + if (executorValue != masterValue) { + throw new SparkException( + s"Master had $config=$masterValue but executor had $config=$executorValue") + } + } + + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index a2c131b0c9..4633bc3f7f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription} class DriverRunnerTest extends FunSuite { private def createDriverRunner() = { - val command = new Command("mainClass", Seq(), Map()) + val command = new Command("mainClass", Seq(), Map(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription, null, "akka://1.2.3.4/worker/") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 3cab8e7b37..8ae387fa0b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -27,7 +27,8 @@ class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) - val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()), + val appDesc = new ApplicationDescription("app name", Some(8), 500, + Command("foo", Seq(), Map(), Seq(), Seq()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), |