aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-07-29 23:52:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-29 23:52:09 -0700
commit4ce92ccaf761e48a10fc4fe4927dbfca858ca22b (patch)
treeaf298364e8b7cff17d1fa93c7f60b4883b0fa88c /core
parent077f633b4720422c5efbf0382e869ead3dc49612 (diff)
downloadspark-4ce92ccaf761e48a10fc4fe4927dbfca858ca22b.tar.gz
spark-4ce92ccaf761e48a10fc4fe4927dbfca858ca22b.tar.bz2
spark-4ce92ccaf761e48a10fc4fe4927dbfca858ca22b.zip
[SPARK-2260] Fix standalone-cluster mode, which was broken
The main thing was that spark configs were not propagated to the driver, and so applications that do not specify `master` or `appName` automatically failed. This PR fixes that and a couple of miscellaneous things that are related. One thing that may or may not be an issue is that the jars must be available on the driver node. In `standalone-cluster` mode, this effectively means these jars must be available on all the worker machines, since the driver is launched on one of them. The semantics here are not the same as `yarn-cluster` mode, where all the relevant jars are uploaded to a distributed cache automatically and shipped to the containers. This is probably not a concern, but still worth a mention. Author: Andrew Or <andrewor14@gmail.com> Closes #1538 from andrewor14/standalone-cluster and squashes the following commits: 8c11a0d [Andrew Or] Clean up imports / comments (minor) 2678d13 [Andrew Or] Handle extraJavaOpts properly 7660547 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster 6f64a9b [Andrew Or] Revert changes in YARN 2f2908b [Andrew Or] Fix tests ed01491 [Andrew Or] Don't go overboard with escaping 8e105e1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster b890949 [Andrew Or] Abstract usages of converting spark opts to java opts 79f63a3 [Andrew Or] Move sparkProps into javaOpts 78752f8 [Andrew Or] Fix tests 5a9c6c7 [Andrew Or] Fix line too long c141a00 [Andrew Or] Don't display "unknown app" on driver log pages d7e2728 [Andrew Or] Avoid deprecation warning in standalone Client 6ceb14f [Andrew Or] Allow relevant configs to propagate to standalone Driver 7f854bc [Andrew Or] Fix test 855256e [Andrew Or] Fix standalone-cluster mode fd9da51 [Andrew Or] Formatting changes (minor)
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Command.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala2
16 files changed, 93 insertions, 51 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8ce4b91cae..38700847c8 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
+ import SparkConf._
+
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
@@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*
* E.g. spark.akka.option.x.y.x = "value"
*/
- getAll.filter {case (k, v) => k.startsWith("akka.")}
+ getAll.filter { case (k, _) => isAkkaConf(k) }
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
@@ -292,3 +294,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}
+
+private[spark] object SparkConf {
+ /**
+ * Return whether the given config is an akka config (e.g. akka.actor.provider).
+ * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
+ */
+ def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
+
+ /**
+ * Return whether the given config should be passed to an executor on start-up.
+ *
+ * Certain akka and authentication configs are required of the executor when it connects to
+ * the scheduler, while the rest of the spark configs can be inherited from the driver later.
+ */
+ def isExecutorStartupConf(name: String): Boolean = {
+ isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c371dc3a51..17c507af26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -17,8 +17,6 @@
package org.apache.spark.deploy
-import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
import scala.concurrent._
import akka.actor._
@@ -50,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
- val env = Map[String, String]()
- System.getenv().foreach{case (k, v) => env(k) = v}
-
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
@@ -65,10 +60,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
cp.split(java.io.File.pathSeparator)
}
- val javaOptionsConf = "spark.driver.extraJavaOptions"
- val javaOpts = sys.props.get(javaOptionsConf)
+ val extraJavaOptsConf = "spark.driver.extraJavaOptions"
+ val extraJavaOpts = sys.props.get(extraJavaOptsConf)
+ .map(Utils.splitCommandString).getOrElse(Seq.empty)
+ val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+ val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
+ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
@@ -109,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// Exception, if present
statusResponse.exception.map { e =>
println(s"Exception from cluster was: $e")
+ e.printStackTrace()
System.exit(-1)
}
System.exit(0)
@@ -141,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
*/
object Client {
def main(args: Array[String]) {
- println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
- println("Use ./bin/spark-submit with \"--master spark://host:port\"")
+ if (!sys.props.contains("SPARK_SUBMIT")) {
+ println("WARNING: This client is deprecated and will be removed in a future version of Spark")
+ println("Use ./bin/spark-submit with \"--master spark://host:port\"")
+ }
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
index 32f3ba3850..a2b263544c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Command.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -25,5 +25,5 @@ private[spark] case class Command(
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
- extraJavaOptions: Option[String] = None) {
+ javaOpts: Seq[String]) {
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c9cec33eba..3df811c4ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -136,8 +136,6 @@ object SparkSubmit {
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
- case (STANDALONE, CLUSTER) =>
- printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
@@ -170,9 +168,9 @@ object SparkSubmit {
val options = List[OptionAssigner](
// All cluster managers
- OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
- OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
- OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
+ OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
+ OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
+ OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
// Standalone cluster only
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
@@ -203,9 +201,9 @@ object SparkSubmit {
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
- OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
+ OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
- OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
+ OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index e15a87bd38..b8ffa9afb6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -46,11 +46,11 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+ val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
- "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
- Seq()), Some("dummy-spark-home"), "ignored")
+ "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
+ Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 4af5bc3afa..687e492a0d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -47,7 +47,6 @@ object CommandUtils extends Logging {
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
- val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
// Exists for backwards compatibility with older Spark versions
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Seq(s"-Djava.library.path=$joined")
} else {
- Seq()
+ Seq()
}
val permGenOpt = Seq("-XX:MaxPermSize=128m")
@@ -71,11 +70,11 @@ object CommandUtils extends Logging {
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment=command.environment)
+ extraEnvironment = command.environment)
val userClassPath = command.classPathEntries ++ Seq(classPath)
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
+ permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 662d37871e..5caaf6bea3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
+ * This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val driverId: String,
@@ -81,7 +82,7 @@ private[spark] class DriverRunner(
driverDesc.command.environment,
classPath,
driverDesc.command.libraryPathEntries,
- driverDesc.command.extraJavaOptions)
+ driverDesc.command.javaOpts)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 467317dd9b..7be89f9aff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender
/**
* Manages the execution of one executor process.
+ * This is currently only used in standalone mode.
*/
private[spark] class ExecutorRunner(
val appId: String,
@@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
}
/**
- * kill executor process, wait for exit and notify worker to update resource status
+ * Kill executor process, wait for exit and notify worker to update resource status.
*
* @param message the exception message which caused the executor's death
*/
@@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
}
def getCommandSeq = {
- val command = Command(appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
- appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
- appDesc.command.extraJavaOptions)
+ val command = Command(
+ appDesc.command.mainClass,
+ appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+ appDesc.command.environment,
+ appDesc.command.classPathEntries,
+ appDesc.command.libraryPathEntries,
+ appDesc.command.javaOpts)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index b389cb546d..ecb358c399 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy.worker.ui
-import java.io.File
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
@@ -25,7 +24,7 @@ import scala.xml.Node
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.Logging
-import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
+import org.apache.spark.util.logging.RollingFileAppender
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
@@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val (logDir, params) = (appId, executorId, driverId) match {
+ val (logDir, params, pageName) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
+ (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
case (None, None, Some(d)) =>
- (s"${workDir.getPath}/$d/", s"driverId=$d")
+ (s"${workDir.getPath}/$d/", s"driverId=$d", d)
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
@@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
</div>
</body>
</html>
- UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app"))
+ UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
}
/** Get the part of the log files given the offset and desired length of bytes */
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b455c9fcf4..860b47e056 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend(
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {
- def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
- workerUrl: Option[String]) {
+
+ private def run(
+ driverUrl: String,
+ executorId: String,
+ hostname: String,
+ cores: Int,
+ workerUrl: Option[String]) {
SignalLogger.register(log)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bf2dc88e29..48aaaa54bd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -46,6 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+ .map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
@@ -54,9 +55,11 @@ private[spark] class SparkDeploySchedulerBackend(
cp.split(java.io.File.pathSeparator)
}
- val command = Command(
- "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
- classPathEntries, libraryPathEntries, extraJavaOpts)
+ // Start executors with a few necessary configs for registering with the scheduler
+ val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
+ val javaOpts = sparkJavaOpts ++ extraJavaOpts
+ val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+ args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8cbb9050f3..69f65b4bdc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1313,4 +1313,13 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}
+ /**
+ * Convert all spark properties set in the given SparkConf to a sequence of java options.
+ */
+ def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = {
+ conf.getAll
+ .filter { case (k, _) => filterKey(k) }
+ .map { case (k, v) => s"-D$k=$v" }
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 01ab2d5493..093394ad6d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite {
}
def createAppDesc(): ApplicationDescription = {
- val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
+ val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}
@@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {
def createDriverCommand() = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
- Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
+ Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
@@ -170,7 +170,7 @@ object JsonConstants {
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
- |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
+ |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin
val executorRunnerJsonStr =
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index f497a5e0a1..a301cbd48a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -200,9 +200,12 @@ class SparkSubmitSuite extends FunSuite with Matchers {
childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.Client")
classpath should have size (0)
- sysProps should have size (3)
- sysProps.keys should contain ("spark.jars")
+ sysProps should have size (5)
sysProps.keys should contain ("SPARK_SUBMIT")
+ sysProps.keys should contain ("spark.master")
+ sysProps.keys should contain ("spark.app.name")
+ sysProps.keys should contain ("spark.jars")
+ sysProps.keys should contain ("spark.shuffle.spill")
sysProps("spark.shuffle.spill") should be ("false")
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index 4633bc3f7f..c930839b47 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
- val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
+ val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
null, "akka://1.2.3.4/worker/")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index e5f748d555..ca4d987619 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
- Command("foo", Seq(), Map(), Seq(), Seq()),
+ Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),