aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-mesos.md27
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala8
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala23
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala9
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala120
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala26
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala19
7 files changed, 131 insertions, 101 deletions
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 923d8dbebf..8d5ad12cb8 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -368,17 +368,6 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
- <td><code>spark.mesos.executor.docker.portmaps</code></td>
- <td>(none)</td>
- <td>
- Set the list of incoming ports exposed by the Docker image, which was set using
- <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of
- mappings which take the form:
-
- <pre>host_port:container_port[:tcp|:udp]</pre>
- </td>
-</tr>
-<tr>
<td><code>spark.mesos.executor.home</code></td>
<td>driver side <code>SPARK_HOME</code></td>
<td>
@@ -505,12 +494,26 @@ See the [configuration page](configuration.html) for information on Spark config
Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
since this configuration is just a upper limit and not a guaranteed amount.
</td>
+ </tr>
+<tr>
+ <td><code>spark.mesos.network.name</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ Attach containers to the given named network. If this job is
+ launched in cluster mode, also launch the driver in the given named
+ network. See
+ <a href="http://mesos.apache.org/documentation/latest/cni/">the Mesos CNI docs</a>
+ for more details.
+ </td>
</tr>
<tr>
<td><code>spark.mesos.fetcherCache.enable</code></td>
<td><code>false</code></td>
<td>
- If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/)
+ If set to `true`, all URIs (example: `spark.executor.uri`,
+ `spark.mesos.uris`) will be cached by the <a
+ href="http://mesos.apache.org/documentation/latest/fetcher/">Mesos
+ Fetcher Cache</a>
</td>
</tr>
</table>
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8db1d126d5..f384290a6f 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -531,13 +531,7 @@ private[spark] class MesosClusterScheduler(
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
-
- desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
- desc.conf,
- taskInfo.getContainerBuilder)
- }
-
+ taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
taskInfo.build
}
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 842c05e7bf..3258b09c06 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -213,7 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
s" --executor-id $taskId" +
- s" --hostname ${offer.getHostname}" +
+ s" --hostname ${executorHostname(offer)}" +
s" --cores $numCores" +
s" --app-id $appId")
} else {
@@ -225,7 +225,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
s" --executor-id $taskId" +
- s" --hostname ${offer.getHostname}" +
+ s" --hostname ${executorHostname(offer)}" +
s" --cores $numCores" +
s" --app-id $appId")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
@@ -418,16 +418,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName("Task " + taskId)
-
taskBuilder.addAllResources(resourcesToUse.asJava)
-
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- sc.conf,
- taskBuilder.getContainerBuilder
- )
- }
+ taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
@@ -658,6 +650,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private def numExecutors(): Int = {
slaves.values.map(_.taskIDs.size).sum
}
+
+ private def executorHostname(offer: Offer): String = {
+ if (sc.conf.getOption("spark.mesos.network.name").isDefined) {
+ // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
+ "0.0.0.0"
+ } else {
+ offer.getHostname
+ }
+ }
}
private class Slave(val hostname: String) {
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index c1aa00151e..779ffb5229 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -155,14 +155,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- sc.conf,
- executorInfo.getContainerBuilder()
- )
- }
-
+ executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
(executorInfo.build(), resourcesAfterMem.asJava)
}
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 3fe06743b8..a2adb228dc 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,8 +17,8 @@
package org.apache.spark.scheduler.cluster.mesos
-import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
-import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
+import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
@@ -99,67 +99,67 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.toList
}
- /**
- * Construct a DockerInfo structure and insert it into a ContainerInfo
- */
- def addDockerInfo(
- container: ContainerInfo.Builder,
- image: String,
- containerizer: String,
- forcePullImage: Boolean = false,
- volumes: Option[List[Volume]] = None,
- portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
-
- containerizer match {
- case "docker" =>
- container.setType(ContainerInfo.Type.DOCKER)
- val docker = ContainerInfo.DockerInfo.newBuilder()
- .setImage(image)
- .setForcePullImage(forcePullImage)
- // TODO (mgummelt): Remove this. Portmaps have no effect,
- // as we don't support bridge networking.
- portmaps.foreach(_.foreach(docker.addPortMappings))
- container.setDocker(docker)
- case "mesos" =>
- container.setType(ContainerInfo.Type.MESOS)
- val imageProto = Image.newBuilder()
- .setType(Image.Type.DOCKER)
- .setDocker(Image.Docker.newBuilder().setName(image))
- .setCached(!forcePullImage)
- container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
- case _ =>
- throw new SparkException(
- "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
+ def containerInfo(conf: SparkConf): ContainerInfo = {
+ val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
+ conf.get("spark.mesos.containerizer", "docker") == "docker") {
+ ContainerInfo.Type.DOCKER
+ } else {
+ ContainerInfo.Type.MESOS
}
- volumes.foreach(_.foreach(container.addVolumes))
+ val containerInfo = ContainerInfo.newBuilder()
+ .setType(containerType)
+
+ conf.getOption("spark.mesos.executor.docker.image").map { image =>
+ val forcePullImage = conf
+ .getOption("spark.mesos.executor.docker.forcePullImage")
+ .exists(_.equals("true"))
+
+ val portMaps = conf
+ .getOption("spark.mesos.executor.docker.portmaps")
+ .map(parsePortMappingsSpec)
+ .getOrElse(List.empty)
+
+ if (containerType == ContainerInfo.Type.DOCKER) {
+ containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
+ } else {
+ containerInfo.setMesos(mesosInfo(image, forcePullImage))
+ }
+
+ val volumes = conf
+ .getOption("spark.mesos.executor.docker.volumes")
+ .map(parseVolumesSpec)
+
+ volumes.foreach(_.foreach(containerInfo.addVolumes(_)))
+ }
+
+ conf.getOption("spark.mesos.network.name").map { name =>
+ val info = NetworkInfo.newBuilder().setName(name).build()
+ containerInfo.addNetworkInfos(info)
+ }
+
+ containerInfo.build()
}
- /**
- * Setup a docker containerizer from MesosDriverDescription scheduler properties
- */
- def setupContainerBuilderDockerInfo(
- imageName: String,
- conf: SparkConf,
- builder: ContainerInfo.Builder): Unit = {
- val forcePullImage = conf
- .getOption("spark.mesos.executor.docker.forcePullImage")
- .exists(_.equals("true"))
- val volumes = conf
- .getOption("spark.mesos.executor.docker.volumes")
- .map(parseVolumesSpec)
- val portmaps = conf
- .getOption("spark.mesos.executor.docker.portmaps")
- .map(parsePortMappingsSpec)
-
- val containerizer = conf.get("spark.mesos.containerizer", "docker")
- addDockerInfo(
- builder,
- imageName,
- containerizer,
- forcePullImage = forcePullImage,
- volumes = volumes,
- portmaps = portmaps)
- logDebug("setupContainerDockerInfo: using docker image: " + imageName)
+ private def dockerInfo(
+ image: String,
+ forcePullImage: Boolean,
+ portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
+ val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
+ .setImage(image)
+ .setForcePullImage(forcePullImage)
+ portMaps.foreach(dockerBuilder.addPortMappings(_))
+
+ dockerBuilder.build
+ }
+
+ private def mesosInfo(image: String, forcePullImage: Boolean): MesosInfo = {
+ val imageProto = Image.newBuilder()
+ .setType(Image.Type.DOCKER)
+ .setDocker(Image.Docker.newBuilder().setName(image))
+ .setCached(!forcePullImage)
+ ContainerInfo.MesosInfo.newBuilder()
+ .setImage(imageProto)
+ .build
}
}
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 87d9080de5..74e5ce227d 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -210,4 +210,30 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
(v.getName, v.getValue)).toMap
assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
}
+
+ test("supports spark.mesos.network.name") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.network.name" -> "test-network-name"),
+ "s1",
+ new Date()))
+
+ assert(response.success)
+
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, List(offer).asJava)
+
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
+ assert(networkInfos.size == 1)
+ assert(networkInfos.get(0).getName == "test-network-name")
+ }
}
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f73638fda6..a674da4066 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -388,9 +388,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val dockerInfo = containerInfo.getDocker
- assert(dockerInfo.getImage == "some_image")
- assert(dockerInfo.getForcePullImage)
-
val portMappings = dockerInfo.getPortMappingsList.asScala
assert(portMappings.size == 1)
@@ -491,6 +488,22 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!uris.asScala.head.getCache)
}
+ test("mesos supports spark.mesos.network.name") {
+ setBackend(Map(
+ "spark.mesos.network.name" -> "test-network-name"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
+ assert(networkInfos.size == 1)
+ assert(networkInfos.get(0).getName == "test-network-name")
+ }
+
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
private def verifyDeclinedOffer(driver: SchedulerDriver,