aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala86
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala93
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala71
-rw-r--r--docs/running-on-mesos.md10
5 files changed, 201 insertions, 106 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 73bd4c58e1..39b0f4d0e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -353,38 +353,60 @@ private[spark] class MesosClusterScheduler(
}
}
- private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
- val appJar = CommandInfo.URI.newBuilder()
- .setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
- val builder = CommandInfo.newBuilder().addUris(appJar)
- val entries = conf.getOption("spark.executor.extraLibraryPath")
- .map(path => Seq(path) ++ desc.command.libraryPathEntries)
- .getOrElse(desc.command.libraryPathEntries)
-
- val prefixEnv = if (!entries.isEmpty) {
- Utils.libraryPathEnvPrefix(entries)
- } else {
- ""
+ private def getDriverExecutorURI(desc: MesosDriverDescription) = {
+ desc.schedulerProperties.get("spark.executor.uri")
+ .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+ }
+
+ private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
+ val env = {
+ val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+ val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
+
+ val prefix = "spark.mesos.driverEnv."
+ val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
+ .map { case (k, v) => (k.substring(prefix.length), v) }
+
+ driverEnv ++ executorEnv ++ desc.command.environment
}
+
val envBuilder = Environment.newBuilder()
- desc.command.environment.foreach { case (k, v) =>
- envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
+ env.foreach { case (k, v) =>
+ envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
}
- // Pass all spark properties to executor.
- val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
- envBuilder.addVariables(
- Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
+ envBuilder.build()
+ }
+
+ private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
+ val confUris = List(conf.getOption("spark.mesos.uris"),
+ desc.schedulerProperties.get("spark.mesos.uris"),
+ desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
+ _.map(_.split(",").map(_.trim))
+ ).flatten
+
+ val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
+
+ ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
+ CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
+ }
+
+ private def getDriverCommandValue(desc: MesosDriverDescription): String = {
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
- val executorUri = desc.schedulerProperties.get("spark.executor.uri")
- .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+ val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
// Application jar is automatically downloaded in the mounted sandbox by Mesos,
// and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
("./bin/spark-submit", "$MESOS_SANDBOX")
} else if (executorUri.isDefined) {
- builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
+
+ val entries = conf.getOption("spark.executor.extraLibraryPath")
+ .map(path => Seq(path) ++ desc.command.libraryPathEntries)
+ .getOrElse(desc.command.libraryPathEntries)
+
+ val prefixEnv = if (!entries.isEmpty) Utils.libraryPathEnvPrefix(entries) else ""
+
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
@@ -399,20 +421,18 @@ private[spark] class MesosClusterScheduler(
// Sandbox points to the current directory by default with Mesos.
(cmdExecutable, ".")
}
- val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
+ val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
val appArguments = desc.command.arguments.mkString(" ")
- builder.setValue(s"$executable $cmdOptions $primaryResource $appArguments")
- builder.setEnvironment(envBuilder.build())
- conf.getOption("spark.mesos.uris").map { uris =>
- setupUris(uris, builder)
- }
- desc.schedulerProperties.get("spark.mesos.uris").map { uris =>
- setupUris(uris, builder)
- }
- desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
- setupUris(pyFiles, builder)
- }
+
+ s"$executable $cmdOptions $primaryResource $appArguments"
+ }
+
+ private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
+ val builder = CommandInfo.newBuilder()
+ builder.setValue(getDriverCommandValue(desc))
+ builder.setEnvironment(getDriverEnvironment(desc))
+ builder.addAllUris(getDriverUris(desc).asJava)
builder.build()
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index a32423dc4f..0260759027 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -31,17 +31,24 @@ import org.scalatest.mock.MockitoSugar
import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
-
+import org.apache.spark.scheduler.cluster.mesos.Utils
class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
+ private var driver: SchedulerDriver = _
private var scheduler: MesosClusterScheduler = _
- override def beforeEach(): Unit = {
+ private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
+
+ if (sparkConfVars != null) {
+ conf.setAll(sparkConfVars)
+ }
+
+ driver = mock[SchedulerDriver]
scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true }
@@ -50,9 +57,11 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can queue drivers") {
+ setScheduler()
+
val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
assert(response.success)
val response2 =
scheduler.submitDriver(new MesosDriverDescription(
@@ -65,6 +74,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can kill queued drivers") {
+ setScheduler()
+
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
@@ -76,6 +87,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can handle multiple roles") {
+ setScheduler()
+
val driver = mock[SchedulerDriver]
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
@@ -138,6 +151,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("escapes commandline args for the shell") {
+ setScheduler()
+
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
@@ -172,4 +187,28 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this"))
})
}
+
+ test("supports spark.mesos.driverEnv.*") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
+ "s1",
+ new Date()))
+ assert(response.success)
+
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, List(offer).asJava)
+ val tasks = Utils.verifyTaskLaunched(driver, "o1")
+ val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
+ (v.getName, v.getValue)).toMap
+ assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 7f21d4c623..c2779d7b35 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -24,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
@@ -35,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.mesos.Utils._
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
@@ -59,7 +59,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// launches a task on a valid offer
offerResources(offers)
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
// kills executors
backend.doRequestTotalExecutors(0)
@@ -74,7 +74,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// Launches a new task when requested executors is positive
backend.doRequestTotalExecutors(2)
offerResources(offers, 2)
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o2")
}
test("mesos supports killing and relaunching tasks with executors") {
@@ -86,7 +86,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offer1 = (minMem, minCpu)
val offer2 = (minMem, 1)
offerResources(List(offer1, offer2))
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
// accounts for a killed task
val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
@@ -95,7 +95,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// Launches a new task on a valid offer from the same slave
offerResources(List(offer2))
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o2")
}
test("mesos supports spark.executor.cores") {
@@ -106,10 +106,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offers = List((executorMemory * 2, executorCores + 1))
offerResources(offers)
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 1)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
assert(cpus == executorCores)
}
@@ -120,10 +120,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offerCores = 10
offerResources(List((executorMemory * 2, offerCores)))
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 1)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
assert(cpus == offerCores)
}
@@ -134,10 +134,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
offerResources(List((executorMemory, maxCores + 1)))
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 1)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
assert(cpus == maxCores)
}
@@ -156,7 +156,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
}
@@ -171,8 +171,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
(executorMemory * 2, executorCores * 2),
(executorMemory * 2, executorCores * 2)))
- verifyTaskLaunched("o1")
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o1")
+ verifyTaskLaunched(driver, "o2")
}
test("mesos creates multiple executors on a single slave") {
@@ -184,8 +184,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
offerResources(List((executorMemory * 2, executorCores * 2)))
// verify two executors were started on a single offer
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 2)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 2)
}
test("mesos doesn't register twice with the same shuffle service") {
@@ -194,11 +194,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
val offer2 = createOffer("o2", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer2).asJava)
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o2")
val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
backend.statusUpdate(driver, status1)
@@ -216,7 +216,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
backend.doKillExecutors(List("0"))
verify(driver, times(1)).killTask(createTaskId("0"))
@@ -269,14 +269,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.resourceOffers(driver, mesosOffers.asJava)
}
- private def verifyTaskLaunched(offerId: String): java.util.Collection[TaskInfo] = {
- val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(createOfferId(offerId))),
- captor.capture())
- captor.getValue
- }
-
private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId).build())
@@ -285,41 +277,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.build
}
-
- private def createOfferId(offerId: String): OfferID = {
- OfferID.newBuilder().setValue(offerId).build()
- }
-
- private def createSlaveId(slaveId: String): SlaveID = {
- SlaveID.newBuilder().setValue(slaveId).build()
- }
-
- private def createExecutorId(executorId: String): ExecutorID = {
- ExecutorID.newBuilder().setValue(executorId).build()
- }
-
- private def createTaskId(taskId: String): TaskID = {
- TaskID.newBuilder().setValue(taskId).build()
- }
-
- private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(mem))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(createOfferId(offerId))
- .setFrameworkId(FrameworkID.newBuilder()
- .setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
- .setHostname(s"host${slaveId}")
- .build()
- }
-
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
@@ -364,9 +321,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.set("spark.mesos.driver.webui.url", "http://webui")
if (sparkConfVars != null) {
- for (attr <- sparkConfVars) {
- sparkConf.set(attr._1, attr._2)
- }
+ sparkConf.setAll(sparkConfVars)
}
sc = new SparkContext(sparkConf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
new file mode 100644
index 0000000000..ff26d14ef5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.Collections
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import scala.collection.JavaConverters._
+
+object Utils {
+ def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(mem))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(cpu))
+ builder.setId(createOfferId(offerId))
+ .setFrameworkId(FrameworkID.newBuilder()
+ .setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+ .setHostname(s"host${slaveId}")
+ .build()
+ }
+
+ def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
+ val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(createOfferId(offerId))),
+ captor.capture())
+ captor.getValue.asScala.toList
+ }
+
+ def createOfferId(offerId: String): OfferID = {
+ OfferID.newBuilder().setValue(offerId).build()
+ }
+
+ def createSlaveId(slaveId: String): SlaveID = {
+ SlaveID.newBuilder().setValue(slaveId).build()
+ }
+
+ def createExecutorId(executorId: String): ExecutorID = {
+ ExecutorID.newBuilder().setValue(executorId).build()
+ }
+
+ def createTaskId(taskId: String): TaskID = {
+ TaskID.newBuilder().setValue(taskId).build()
+ }
+}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 5219e99fee..10dc9ce890 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -429,6 +429,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ This only affects drivers submitted in cluster mode. Add the
+ environment variable specified by EnvironmentVariableName to the
+ driver process. The user can specify multiple of these to set
+ multiple environment variables.
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.dispatcher.webui.url</code></td>
<td><code>(none)</code></td>
<td>