diff options
author | Chris Heller <hellertime@gmail.com> | 2015-05-01 18:41:22 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-01 18:41:22 -0700 |
commit | 8f50a07d2188ccc5315d979755188b1e5d5b5471 (patch) | |
tree | 4f3fc389e598179c1774a1bfb25bcb4be4418651 /core | |
parent | b4b43df8a338a30c0eadcf10cbe3ba203dc3f861 (diff) | |
download | spark-8f50a07d2188ccc5315d979755188b1e5d5b5471.tar.gz spark-8f50a07d2188ccc5315d979755188b1e5d5b5471.tar.bz2 spark-8f50a07d2188ccc5315d979755188b1e5d5b5471.zip |
[SPARK-2691] [MESOS] Support for Mesos DockerInfo
This patch adds partial support for running spark on mesos inside of a docker container. Only fine-grained mode is presently supported, and there is no checking done to ensure that the version of libmesos is recent enough to have a DockerInfo structure in the protobuf (other than pinning a mesos version in the pom.xml).
Author: Chris Heller <hellertime@gmail.com>
Closes #3074 from hellertime/SPARK-2691 and squashes the following commits:
d504af6 [Chris Heller] Assist type inference
f64885d [Chris Heller] Fix errant line length
17c41c0 [Chris Heller] Base Dockerfile on mesosphere/mesos image
8aebda4 [Chris Heller] Simplfy Docker image docs
1ae7f4f [Chris Heller] Style points
974bd56 [Chris Heller] Convert map to flatMap
5d8bdf7 [Chris Heller] Factor out the DockerInfo construction.
7b75a3d [Chris Heller] Align to styleguide
80108e7 [Chris Heller] Bend to the will of RAT
ba77056 [Chris Heller] Explicit RAT exclude
abda5e5 [Chris Heller] Wildcard .rat-excludes
2f2873c [Chris Heller] Exclude spark-mesos from RAT
a589a5b [Chris Heller] Add example Dockerfile
b6825ce [Chris Heller] Remove use of EasyMock
eae1b86 [Chris Heller] Move properties under 'spark.mesos.'
c184d00 [Chris Heller] Use map on Option to be consistent with non-coarse code
fb9501a [Chris Heller] Bumped mesos version to current release
fa11879 [Chris Heller] Add listenerBus to EasyMock
882151e [Chris Heller] Changes to scala style
b22d42d [Chris Heller] Exclude template from RAT
db536cf [Chris Heller] Remove unneeded mocks
dea1bd5 [Chris Heller] Force default protocol
7dac042 [Chris Heller] Add test for DockerInfo
5456c0c [Chris Heller] Adjust syntax style
521c194 [Chris Heller] Adjust version info
6e38f70 [Chris Heller] Document Mesos Docker properties
29572ab [Chris Heller] Support all DockerInfo fields
b8c0dea [Chris Heller] Support for mesos DockerInfo in coarse-mode.
482a9fd [Chris Heller] Support for mesos DockerInfo in fine-grained mode.
Diffstat (limited to 'core')
4 files changed, 203 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 3412301e64..dc59545b43 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -196,9 +196,14 @@ private[spark] class CoarseMesosSchedulerBackend( .addResources(createResource("cpus", cpusToUse)) .addResources(createResource("mem", MemoryUtils.calculateTotalMemory(sc))) - .build() + + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil + .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder()) + } + d.launchTasks( - Collections.singleton(offer.getId), Collections.singletonList(task), filters) + Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters) } else { // Filter it out d.launchTasks( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 86a7d0fb58..db0a080b3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -124,13 +124,19 @@ private[spark] class MesosSchedulerBackend( Value.Scalar.newBuilder() .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) .build() - MesosExecutorInfo.newBuilder() + val executorInfo = MesosExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) .addResources(cpus) .addResources(memory) - .build() + + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil + .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) + } + + executorInfo.build() } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala new file mode 100644 index 0000000000..928c5cfed4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.mesos.Protos.{ContainerInfo, Volume} +import org.apache.mesos.Protos.ContainerInfo.DockerInfo + +import org.apache.spark.{Logging, SparkConf} + +/** + * A collection of utility functions which can be used by both the + * MesosSchedulerBackend and the CoarseMesosSchedulerBackend. + */ +private[mesos] object MesosSchedulerBackendUtil extends Logging { + /** + * Parse a comma-delimited list of volume specs, each of which + * takes the form [host-dir:]container-dir[:rw|:ro]. + */ + def parseVolumesSpec(volumes: String): List[Volume] = { + volumes.split(",").map(_.split(":")).flatMap { spec => + val vol: Volume.Builder = Volume + .newBuilder() + .setMode(Volume.Mode.RW) + spec match { + case Array(container_path) => + Some(vol.setContainerPath(container_path)) + case Array(container_path, "rw") => + Some(vol.setContainerPath(container_path)) + case Array(container_path, "ro") => + Some(vol.setContainerPath(container_path) + .setMode(Volume.Mode.RO)) + case Array(host_path, container_path) => + Some(vol.setContainerPath(container_path) + .setHostPath(host_path)) + case Array(host_path, container_path, "rw") => + Some(vol.setContainerPath(container_path) + .setHostPath(host_path)) + case Array(host_path, container_path, "ro") => + Some(vol.setContainerPath(container_path) + .setHostPath(host_path) + .setMode(Volume.Mode.RO)) + case spec => { + logWarning(s"Unable to parse volume specs: $volumes. " + + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"") + None + } + } + } + .map { _.build() } + .toList + } + + /** + * Parse a comma-delimited list of port mapping specs, each of which + * takes the form host_port:container_port[:udp|:tcp] + * + * Note: + * the docker form is [ip:]host_port:container_port, but the DockerInfo + * message has no field for 'ip', and instead has a 'protocol' field. + * Docker itself only appears to support TCP, so this alternative form + * anticipates the expansion of the docker form to allow for a protocol + * and leaves open the chance for mesos to begin to accept an 'ip' field + */ + def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = { + portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] => + val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping + .newBuilder() + .setProtocol("tcp") + spec match { + case Array(host_port, container_port) => + Some(portmap.setHostPort(host_port.toInt) + .setContainerPort(container_port.toInt)) + case Array(host_port, container_port, protocol) => + Some(portmap.setHostPort(host_port.toInt) + .setContainerPort(container_port.toInt) + .setProtocol(protocol)) + case spec => { + logWarning(s"Unable to parse port mapping specs: $portmaps. " + + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"") + None + } + } + } + .map { _.build() } + .toList + } + + /** + * Construct a DockerInfo structure and insert it into a ContainerInfo + */ + def addDockerInfo( + container: ContainerInfo.Builder, + image: String, + volumes: Option[List[Volume]] = None, + network: Option[ContainerInfo.DockerInfo.Network] = None, + portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = { + + val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image) + + network.foreach(docker.setNetwork) + portmaps.foreach(_.foreach(docker.addPortMappings)) + container.setType(ContainerInfo.Type.DOCKER) + container.setDocker(docker.build()) + volumes.foreach(_.foreach(container.addVolumes)) + } + + /** + * Setup a docker containerizer + */ + def setupContainerBuilderDockerInfo( + imageName: String, + conf: SparkConf, + builder: ContainerInfo.Builder): Unit = { + val volumes = conf + .getOption("spark.mesos.executor.docker.volumes") + .map(parseVolumesSpec) + val portmaps = conf + .getOption("spark.mesos.executor.docker.portmaps") + .map(parsePortMappingsSpec) + addDockerInfo( + builder, + imageName, + volumes = volumes, + portmaps = portmaps) + logDebug("setupContainerDockerInfo: using docker image: " + imageName) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index cdd7be0fbe..ab863f3d8d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -73,6 +73,52 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") } + test("spark docker properties correctly populate the DockerInfo message") { + val taskScheduler = mock[TaskSchedulerImpl] + + val conf = new SparkConf() + .set("spark.mesos.executor.docker.image", "spark/mock") + .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") + .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(conf) + when(sc.listenerBus).thenReturn(listenerBus) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val execInfo = backend.createExecutorInfo("mockExecutor") + assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) + val portmaps = execInfo.getContainer.getDocker.getPortMappingsList + assert(portmaps.get(0).getHostPort.equals(80)) + assert(portmaps.get(0).getContainerPort.equals(8080)) + assert(portmaps.get(0).getProtocol.equals("tcp")) + assert(portmaps.get(1).getHostPort.equals(53)) + assert(portmaps.get(1).getContainerPort.equals(53)) + assert(portmaps.get(1).getProtocol.equals("tcp")) + val volumes = execInfo.getContainer.getVolumesList + assert(volumes.get(0).getContainerPath.equals("/a")) + assert(volumes.get(0).getMode.equals(Volume.Mode.RW)) + assert(volumes.get(1).getContainerPath.equals("/b")) + assert(volumes.get(1).getHostPath.equals("/b")) + assert(volumes.get(1).getMode.equals(Volume.Mode.RW)) + assert(volumes.get(2).getContainerPath.equals("/c")) + assert(volumes.get(2).getHostPath.equals("/c")) + assert(volumes.get(2).getMode.equals(Volume.Mode.RW)) + assert(volumes.get(3).getContainerPath.equals("/d")) + assert(volumes.get(3).getMode.equals(Volume.Mode.RO)) + assert(volumes.get(4).getContainerPath.equals("/e")) + assert(volumes.get(4).getHostPath.equals("/e")) + assert(volumes.get(4).getMode.equals(Volume.Mode.RO)) + } + test("mesos resource offers result in launching tasks") { def createOffer(id: Int, mem: Int, cpu: Int): Offer = { val builder = Offer.newBuilder() |