aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala44
-rw-r--r--docs/running-on-mesos.md11
2 files changed, 33 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 2189fca67a..bb6f6b3e3f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler(
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
- private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
- m.updated(k, f(m.getOrElse(k, default)))
- }
-
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
}
- private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
- val env = {
- val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
- val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
- val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
+ private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
+ m.updated(k, f(m.getOrElse(k, default)))
+ }
- var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
- v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
- )
+ private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
+ // TODO(mgummelt): Don't do this here. This should be passed as a --conf
+ val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
+ v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
+ )
- driverEnv ++ executorEnv ++ commandEnv
- }
+ val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val envBuilder = Environment.newBuilder()
env.foreach { case (k, v) =>
@@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler(
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
- val replicatedOptionsBlacklist = Set(
- "spark.jars", // Avoids duplicate classes in classpath
- "spark.submit.deployMode", // this would be set to `cluster`, but we need client
- "spark.master" // this contains the address of the dispatcher, not master
- )
-
// Assume empty main class means we're running python
if (!desc.command.mainClass.equals("")) {
options ++= Seq("--class", desc.command.mainClass)
@@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler(
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
- desc.conf.getAll
+
+ // --conf
+ val replicatedOptionsBlacklist = Set(
+ "spark.jars", // Avoids duplicate classes in classpath
+ "spark.submit.deployMode", // this would be set to `cluster`, but we need client
+ "spark.master" // this contains the address of the dispatcher, not master
+ )
+ val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
+ val driverConf = desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
- .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+ .toMap
+ (defaultConf ++ driverConf).foreach { case (key, value) =>
+ options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+
options
}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 613da68531..a6ce34c761 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -467,6 +467,17 @@ See the [configuration page](configuration.html) for information on Spark config
Set the Spark Mesos dispatcher webui_url for interacting with the framework.
If unset it will point to Spark's internal web UI.
</td>
+ </tr>
+<tr>
+ <td><code>spark.mesos.dispatcher.driverDefault.[PropertyName]</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ Set default properties for drivers submitted through the
+ dispatcher. For example,
+ spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g
+ results in the executors for all drivers submitted in cluster mode
+ to run in 32g containers.
+</td>
</tr>
<tr>
<td><code>spark.mesos.dispatcher.historyServer.url</code></td>