aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2016-08-10 10:11:03 +0100
committerSean Owen <sowen@cloudera.com>2016-08-10 10:11:03 +0100
commiteca58755fbbc11937b335ad953a3caff89b818e6 (patch)
tree13274396606247644ee9de7f68cc791cac6051f8 /core
parentbfda53f63a31bf2e8b72ab9e85896a4bec1644e8 (diff)
downloadspark-eca58755fbbc11937b335ad953a3caff89b818e6.tar.gz
spark-eca58755fbbc11937b335ad953a3caff89b818e6.tar.bz2
spark-eca58755fbbc11937b335ad953a3caff89b818e6.zip
[SPARK-16927][SPARK-16923] Override task properties at dispatcher.
## What changes were proposed in this pull request? - enable setting default properties for all jobs submitted through the dispatcher [SPARK-16927] - remove duplication of conf vars on cluster submitted jobs [SPARK-16923] (this is a small fix, so I'm including in the same PR) ## How was this patch tested? mesos/spark integration test suite manual testing Author: Timothy Chen <tnachen@gmail.com> Closes #14511 from mgummelt/override-props.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala44
1 files changed, 22 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 2189fca67a..bb6f6b3e3f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler(
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
- private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
- m.updated(k, f(m.getOrElse(k, default)))
- }
-
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
}
- private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
- val env = {
- val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
- val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
- val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
+ private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
+ m.updated(k, f(m.getOrElse(k, default)))
+ }
- var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
- v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
- )
+ private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
+ // TODO(mgummelt): Don't do this here. This should be passed as a --conf
+ val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
+ v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
+ )
- driverEnv ++ executorEnv ++ commandEnv
- }
+ val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val envBuilder = Environment.newBuilder()
env.foreach { case (k, v) =>
@@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler(
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
- val replicatedOptionsBlacklist = Set(
- "spark.jars", // Avoids duplicate classes in classpath
- "spark.submit.deployMode", // this would be set to `cluster`, but we need client
- "spark.master" // this contains the address of the dispatcher, not master
- )
-
// Assume empty main class means we're running python
if (!desc.command.mainClass.equals("")) {
options ++= Seq("--class", desc.command.mainClass)
@@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler(
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
- desc.conf.getAll
+
+ // --conf
+ val replicatedOptionsBlacklist = Set(
+ "spark.jars", // Avoids duplicate classes in classpath
+ "spark.submit.deployMode", // this would be set to `cluster`, but we need client
+ "spark.master" // this contains the address of the dispatcher, not master
+ )
+ val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
+ val driverConf = desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
- .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+ .toMap
+ (defaultConf ++ driverConf).foreach { case (key, value) =>
+ options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+
options
}