diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 44 | ||||
-rw-r--r-- | docs/running-on-mesos.md | 11 |
2 files changed, 33 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 2189fca67a..bb6f6b3e3f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler( .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) } - private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { - m.updated(k, f(m.getOrElse(k, default))) - } - private def getDriverFrameworkID(desc: MesosDriverDescription): String = { s"${frameworkId}-${desc.submissionId}" } - private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { - val env = { - val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ") - val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts) - val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") + private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { + m.updated(k, f(m.getOrElse(k, default))) + } - var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( - v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" - ) + private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { + // TODO(mgummelt): Don't do this here. This should be passed as a --conf + val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( + v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" + ) - driverEnv ++ executorEnv ++ commandEnv - } + val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv val envBuilder = Environment.newBuilder() env.foreach { case (k, v) => @@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler( "--driver-cores", desc.cores.toString, "--driver-memory", s"${desc.mem}M") - val replicatedOptionsBlacklist = Set( - "spark.jars", // Avoids duplicate classes in classpath - "spark.submit.deployMode", // this would be set to `cluster`, but we need client - "spark.master" // this contains the address of the dispatcher, not master - ) - // Assume empty main class means we're running python if (!desc.command.mainClass.equals("")) { options ++= Seq("--class", desc.command.mainClass) @@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler( .mkString(",") options ++= Seq("--py-files", formattedFiles) } - desc.conf.getAll + + // --conf + val replicatedOptionsBlacklist = Set( + "spark.jars", // Avoids duplicate classes in classpath + "spark.submit.deployMode", // this would be set to `cluster`, but we need client + "spark.master" // this contains the address of the dispatcher, not master + ) + val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap + val driverConf = desc.conf.getAll .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } - .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + .toMap + (defaultConf ++ driverConf).foreach { case (key, value) => + options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + options } diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 613da68531..a6ce34c761 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -467,6 +467,17 @@ See the [configuration page](configuration.html) for information on Spark config Set the Spark Mesos dispatcher webui_url for interacting with the framework. If unset it will point to Spark's internal web UI. </td> + </tr> +<tr> + <td><code>spark.mesos.dispatcher.driverDefault.[PropertyName]</code></td> + <td><code>(none)</code></td> + <td> + Set default properties for drivers submitted through the + dispatcher. For example, + spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g + results in the executors for all drivers submitted in cluster mode + to run in 32g containers. +</td> </tr> <tr> <td><code>spark.mesos.dispatcher.historyServer.url</code></td> |