diff options
author | Timothy Chen <tnachen@gmail.com> | 2016-08-10 10:11:03 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-08-10 10:11:03 +0100 |
commit | eca58755fbbc11937b335ad953a3caff89b818e6 (patch) | |
tree | 13274396606247644ee9de7f68cc791cac6051f8 /core/src/main | |
parent | bfda53f63a31bf2e8b72ab9e85896a4bec1644e8 (diff) | |
download | spark-eca58755fbbc11937b335ad953a3caff89b818e6.tar.gz spark-eca58755fbbc11937b335ad953a3caff89b818e6.tar.bz2 spark-eca58755fbbc11937b335ad953a3caff89b818e6.zip |
[SPARK-16927][SPARK-16923] Override task properties at dispatcher.
## What changes were proposed in this pull request?
- enable setting default properties for all jobs submitted through the dispatcher [SPARK-16927]
- remove duplication of conf vars on cluster submitted jobs [SPARK-16923] (this is a small fix, so I'm including in the same PR)
## How was this patch tested?
mesos/spark integration test suite
manual testing
Author: Timothy Chen <tnachen@gmail.com>
Closes #14511 from mgummelt/override-props.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 2189fca67a..bb6f6b3e3f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler( .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) } - private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { - m.updated(k, f(m.getOrElse(k, default))) - } - private def getDriverFrameworkID(desc: MesosDriverDescription): String = { s"${frameworkId}-${desc.submissionId}" } - private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { - val env = { - val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ") - val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts) - val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") + private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { + m.updated(k, f(m.getOrElse(k, default))) + } - var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( - v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" - ) + private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { + // TODO(mgummelt): Don't do this here. This should be passed as a --conf + val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( + v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" + ) - driverEnv ++ executorEnv ++ commandEnv - } + val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv val envBuilder = Environment.newBuilder() env.foreach { case (k, v) => @@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler( "--driver-cores", desc.cores.toString, "--driver-memory", s"${desc.mem}M") - val replicatedOptionsBlacklist = Set( - "spark.jars", // Avoids duplicate classes in classpath - "spark.submit.deployMode", // this would be set to `cluster`, but we need client - "spark.master" // this contains the address of the dispatcher, not master - ) - // Assume empty main class means we're running python if (!desc.command.mainClass.equals("")) { options ++= Seq("--class", desc.command.mainClass) @@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler( .mkString(",") options ++= Seq("--py-files", formattedFiles) } - desc.conf.getAll + + // --conf + val replicatedOptionsBlacklist = Set( + "spark.jars", // Avoids duplicate classes in classpath + "spark.submit.deployMode", // this would be set to `cluster`, but we need client + "spark.master" // this contains the address of the dispatcher, not master + ) + val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap + val driverConf = desc.conf.getAll .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } - .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + .toMap + (defaultConf ++ driverConf).foreach { case (key, value) => + options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + options } |