aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorChris Heller <hellertime@gmail.com>2015-05-01 18:41:22 -0700
committerAndrew Or <andrew@databricks.com>2015-05-01 18:41:22 -0700
commit8f50a07d2188ccc5315d979755188b1e5d5b5471 (patch)
tree4f3fc389e598179c1774a1bfb25bcb4be4418651 /core
parentb4b43df8a338a30c0eadcf10cbe3ba203dc3f861 (diff)
downloadspark-8f50a07d2188ccc5315d979755188b1e5d5b5471.tar.gz
spark-8f50a07d2188ccc5315d979755188b1e5d5b5471.tar.bz2
spark-8f50a07d2188ccc5315d979755188b1e5d5b5471.zip
[SPARK-2691] [MESOS] Support for Mesos DockerInfo
This patch adds partial support for running spark on mesos inside of a docker container. Only fine-grained mode is presently supported, and there is no checking done to ensure that the version of libmesos is recent enough to have a DockerInfo structure in the protobuf (other than pinning a mesos version in the pom.xml). Author: Chris Heller <hellertime@gmail.com> Closes #3074 from hellertime/SPARK-2691 and squashes the following commits: d504af6 [Chris Heller] Assist type inference f64885d [Chris Heller] Fix errant line length 17c41c0 [Chris Heller] Base Dockerfile on mesosphere/mesos image 8aebda4 [Chris Heller] Simplfy Docker image docs 1ae7f4f [Chris Heller] Style points 974bd56 [Chris Heller] Convert map to flatMap 5d8bdf7 [Chris Heller] Factor out the DockerInfo construction. 7b75a3d [Chris Heller] Align to styleguide 80108e7 [Chris Heller] Bend to the will of RAT ba77056 [Chris Heller] Explicit RAT exclude abda5e5 [Chris Heller] Wildcard .rat-excludes 2f2873c [Chris Heller] Exclude spark-mesos from RAT a589a5b [Chris Heller] Add example Dockerfile b6825ce [Chris Heller] Remove use of EasyMock eae1b86 [Chris Heller] Move properties under 'spark.mesos.' c184d00 [Chris Heller] Use map on Option to be consistent with non-coarse code fb9501a [Chris Heller] Bumped mesos version to current release fa11879 [Chris Heller] Add listenerBus to EasyMock 882151e [Chris Heller] Changes to scala style b22d42d [Chris Heller] Exclude template from RAT db536cf [Chris Heller] Remove unneeded mocks dea1bd5 [Chris Heller] Force default protocol 7dac042 [Chris Heller] Add test for DockerInfo 5456c0c [Chris Heller] Adjust syntax style 521c194 [Chris Heller] Adjust version info 6e38f70 [Chris Heller] Document Mesos Docker properties 29572ab [Chris Heller] Support all DockerInfo fields b8c0dea [Chris Heller] Support for mesos DockerInfo in coarse-mode. 482a9fd [Chris Heller] Support for mesos DockerInfo in fine-grained mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala142
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala46
4 files changed, 203 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3412301e64..dc59545b43 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -196,9 +196,14 @@ private[spark] class CoarseMesosSchedulerBackend(
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
- .build()
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
+ }
+
d.launchTasks(
- Collections.singleton(offer.getId), Collections.singletonList(task), filters)
+ Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
} else {
// Filter it out
d.launchTasks(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 86a7d0fb58..db0a080b3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,13 +124,19 @@ private[spark] class MesosSchedulerBackend(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
- MesosExecutorInfo.newBuilder()
+ val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(cpus)
.addResources(memory)
- .build()
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+ }
+
+ executorInfo.build()
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
new file mode 100644
index 0000000000..928c5cfed4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.{ContainerInfo, Volume}
+import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * A collection of utility functions which can be used by both the
+ * MesosSchedulerBackend and the CoarseMesosSchedulerBackend.
+ */
+private[mesos] object MesosSchedulerBackendUtil extends Logging {
+ /**
+ * Parse a comma-delimited list of volume specs, each of which
+ * takes the form [host-dir:]container-dir[:rw|:ro].
+ */
+ def parseVolumesSpec(volumes: String): List[Volume] = {
+ volumes.split(",").map(_.split(":")).flatMap { spec =>
+ val vol: Volume.Builder = Volume
+ .newBuilder()
+ .setMode(Volume.Mode.RW)
+ spec match {
+ case Array(container_path) =>
+ Some(vol.setContainerPath(container_path))
+ case Array(container_path, "rw") =>
+ Some(vol.setContainerPath(container_path))
+ case Array(container_path, "ro") =>
+ Some(vol.setContainerPath(container_path)
+ .setMode(Volume.Mode.RO))
+ case Array(host_path, container_path) =>
+ Some(vol.setContainerPath(container_path)
+ .setHostPath(host_path))
+ case Array(host_path, container_path, "rw") =>
+ Some(vol.setContainerPath(container_path)
+ .setHostPath(host_path))
+ case Array(host_path, container_path, "ro") =>
+ Some(vol.setContainerPath(container_path)
+ .setHostPath(host_path)
+ .setMode(Volume.Mode.RO))
+ case spec => {
+ logWarning(s"Unable to parse volume specs: $volumes. "
+ + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
+ None
+ }
+ }
+ }
+ .map { _.build() }
+ .toList
+ }
+
+ /**
+ * Parse a comma-delimited list of port mapping specs, each of which
+ * takes the form host_port:container_port[:udp|:tcp]
+ *
+ * Note:
+ * the docker form is [ip:]host_port:container_port, but the DockerInfo
+ * message has no field for 'ip', and instead has a 'protocol' field.
+ * Docker itself only appears to support TCP, so this alternative form
+ * anticipates the expansion of the docker form to allow for a protocol
+ * and leaves open the chance for mesos to begin to accept an 'ip' field
+ */
+ def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
+ portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
+ val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
+ .newBuilder()
+ .setProtocol("tcp")
+ spec match {
+ case Array(host_port, container_port) =>
+ Some(portmap.setHostPort(host_port.toInt)
+ .setContainerPort(container_port.toInt))
+ case Array(host_port, container_port, protocol) =>
+ Some(portmap.setHostPort(host_port.toInt)
+ .setContainerPort(container_port.toInt)
+ .setProtocol(protocol))
+ case spec => {
+ logWarning(s"Unable to parse port mapping specs: $portmaps. "
+ + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
+ None
+ }
+ }
+ }
+ .map { _.build() }
+ .toList
+ }
+
+ /**
+ * Construct a DockerInfo structure and insert it into a ContainerInfo
+ */
+ def addDockerInfo(
+ container: ContainerInfo.Builder,
+ image: String,
+ volumes: Option[List[Volume]] = None,
+ network: Option[ContainerInfo.DockerInfo.Network] = None,
+ portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = {
+
+ val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+
+ network.foreach(docker.setNetwork)
+ portmaps.foreach(_.foreach(docker.addPortMappings))
+ container.setType(ContainerInfo.Type.DOCKER)
+ container.setDocker(docker.build())
+ volumes.foreach(_.foreach(container.addVolumes))
+ }
+
+ /**
+ * Setup a docker containerizer
+ */
+ def setupContainerBuilderDockerInfo(
+ imageName: String,
+ conf: SparkConf,
+ builder: ContainerInfo.Builder): Unit = {
+ val volumes = conf
+ .getOption("spark.mesos.executor.docker.volumes")
+ .map(parseVolumesSpec)
+ val portmaps = conf
+ .getOption("spark.mesos.executor.docker.portmaps")
+ .map(parsePortMappingsSpec)
+ addDockerInfo(
+ builder,
+ imageName,
+ volumes = volumes,
+ portmaps = portmaps)
+ logDebug("setupContainerDockerInfo: using docker image: " + imageName)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index cdd7be0fbe..ab863f3d8d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -73,6 +73,52 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
}
+ test("spark docker properties correctly populate the DockerInfo message") {
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val conf = new SparkConf()
+ .set("spark.mesos.executor.docker.image", "spark/mock")
+ .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
+ .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(conf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val execInfo = backend.createExecutorInfo("mockExecutor")
+ assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+ val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
+ assert(portmaps.get(0).getHostPort.equals(80))
+ assert(portmaps.get(0).getContainerPort.equals(8080))
+ assert(portmaps.get(0).getProtocol.equals("tcp"))
+ assert(portmaps.get(1).getHostPort.equals(53))
+ assert(portmaps.get(1).getContainerPort.equals(53))
+ assert(portmaps.get(1).getProtocol.equals("tcp"))
+ val volumes = execInfo.getContainer.getVolumesList
+ assert(volumes.get(0).getContainerPath.equals("/a"))
+ assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(1).getContainerPath.equals("/b"))
+ assert(volumes.get(1).getHostPath.equals("/b"))
+ assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(2).getContainerPath.equals("/c"))
+ assert(volumes.get(2).getHostPath.equals("/c"))
+ assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(3).getContainerPath.equals("/d"))
+ assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
+ assert(volumes.get(4).getContainerPath.equals("/e"))
+ assert(volumes.get(4).getHostPath.equals("/e"))
+ assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
+ }
+
test("mesos resource offers result in launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
val builder = Offer.newBuilder()