aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.rat-excludes1
-rw-r--r--conf/docker.properties.template3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala142
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala46
-rw-r--r--docker/spark-mesos/Dockerfile30
-rw-r--r--docs/running-on-mesos.md42
-rw-r--r--pom.xml2
9 files changed, 280 insertions, 5 deletions
diff --git a/.rat-excludes b/.rat-excludes
index 4468da1900..2238a5b68e 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -15,6 +15,7 @@ TAGS
RELEASE
control
docs
+docker.properties.template
fairscheduler.xml.template
spark-defaults.conf.template
log4j.properties
diff --git a/conf/docker.properties.template b/conf/docker.properties.template
new file mode 100644
index 0000000000..26e3bfd9c5
--- /dev/null
+++ b/conf/docker.properties.template
@@ -0,0 +1,3 @@
+spark.mesos.executor.docker.image: <image built from `../docker/spark-mesos/Dockerfile`>
+spark.mesos.executor.docker.volumes: /usr/local/lib:/host/usr/local/lib:ro
+spark.mesos.executor.home: /opt/spark
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3412301e64..dc59545b43 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -196,9 +196,14 @@ private[spark] class CoarseMesosSchedulerBackend(
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
- .build()
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
+ }
+
d.launchTasks(
- Collections.singleton(offer.getId), Collections.singletonList(task), filters)
+ Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
} else {
// Filter it out
d.launchTasks(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 86a7d0fb58..db0a080b3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,13 +124,19 @@ private[spark] class MesosSchedulerBackend(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
- MesosExecutorInfo.newBuilder()
+ val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(cpus)
.addResources(memory)
- .build()
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+ }
+
+ executorInfo.build()
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
new file mode 100644
index 0000000000..928c5cfed4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.{ContainerInfo, Volume}
+import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * A collection of utility functions which can be used by both the
+ * MesosSchedulerBackend and the CoarseMesosSchedulerBackend.
+ */
+private[mesos] object MesosSchedulerBackendUtil extends Logging {
+ /**
+ * Parse a comma-delimited list of volume specs, each of which
+ * takes the form [host-dir:]container-dir[:rw|:ro].
+ */
+ def parseVolumesSpec(volumes: String): List[Volume] = {
+ volumes.split(",").map(_.split(":")).flatMap { spec =>
+ val vol: Volume.Builder = Volume
+ .newBuilder()
+ .setMode(Volume.Mode.RW)
+ spec match {
+ case Array(container_path) =>
+ Some(vol.setContainerPath(container_path))
+ case Array(container_path, "rw") =>
+ Some(vol.setContainerPath(container_path))
+ case Array(container_path, "ro") =>
+ Some(vol.setContainerPath(container_path)
+ .setMode(Volume.Mode.RO))
+ case Array(host_path, container_path) =>
+ Some(vol.setContainerPath(container_path)
+ .setHostPath(host_path))
+ case Array(host_path, container_path, "rw") =>
+ Some(vol.setContainerPath(container_path)
+ .setHostPath(host_path))
+ case Array(host_path, container_path, "ro") =>
+ Some(vol.setContainerPath(container_path)
+ .setHostPath(host_path)
+ .setMode(Volume.Mode.RO))
+ case spec => {
+ logWarning(s"Unable to parse volume specs: $volumes. "
+ + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
+ None
+ }
+ }
+ }
+ .map { _.build() }
+ .toList
+ }
+
+ /**
+ * Parse a comma-delimited list of port mapping specs, each of which
+ * takes the form host_port:container_port[:udp|:tcp]
+ *
+ * Note:
+ * the docker form is [ip:]host_port:container_port, but the DockerInfo
+ * message has no field for 'ip', and instead has a 'protocol' field.
+ * Docker itself only appears to support TCP, so this alternative form
+ * anticipates the expansion of the docker form to allow for a protocol
+ * and leaves open the chance for mesos to begin to accept an 'ip' field
+ */
+ def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
+ portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
+ val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
+ .newBuilder()
+ .setProtocol("tcp")
+ spec match {
+ case Array(host_port, container_port) =>
+ Some(portmap.setHostPort(host_port.toInt)
+ .setContainerPort(container_port.toInt))
+ case Array(host_port, container_port, protocol) =>
+ Some(portmap.setHostPort(host_port.toInt)
+ .setContainerPort(container_port.toInt)
+ .setProtocol(protocol))
+ case spec => {
+ logWarning(s"Unable to parse port mapping specs: $portmaps. "
+ + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
+ None
+ }
+ }
+ }
+ .map { _.build() }
+ .toList
+ }
+
+ /**
+ * Construct a DockerInfo structure and insert it into a ContainerInfo
+ */
+ def addDockerInfo(
+ container: ContainerInfo.Builder,
+ image: String,
+ volumes: Option[List[Volume]] = None,
+ network: Option[ContainerInfo.DockerInfo.Network] = None,
+ portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = {
+
+ val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+
+ network.foreach(docker.setNetwork)
+ portmaps.foreach(_.foreach(docker.addPortMappings))
+ container.setType(ContainerInfo.Type.DOCKER)
+ container.setDocker(docker.build())
+ volumes.foreach(_.foreach(container.addVolumes))
+ }
+
+ /**
+ * Setup a docker containerizer
+ */
+ def setupContainerBuilderDockerInfo(
+ imageName: String,
+ conf: SparkConf,
+ builder: ContainerInfo.Builder): Unit = {
+ val volumes = conf
+ .getOption("spark.mesos.executor.docker.volumes")
+ .map(parseVolumesSpec)
+ val portmaps = conf
+ .getOption("spark.mesos.executor.docker.portmaps")
+ .map(parsePortMappingsSpec)
+ addDockerInfo(
+ builder,
+ imageName,
+ volumes = volumes,
+ portmaps = portmaps)
+ logDebug("setupContainerDockerInfo: using docker image: " + imageName)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index cdd7be0fbe..ab863f3d8d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -73,6 +73,52 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
}
+ test("spark docker properties correctly populate the DockerInfo message") {
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val conf = new SparkConf()
+ .set("spark.mesos.executor.docker.image", "spark/mock")
+ .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
+ .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(conf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val execInfo = backend.createExecutorInfo("mockExecutor")
+ assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+ val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
+ assert(portmaps.get(0).getHostPort.equals(80))
+ assert(portmaps.get(0).getContainerPort.equals(8080))
+ assert(portmaps.get(0).getProtocol.equals("tcp"))
+ assert(portmaps.get(1).getHostPort.equals(53))
+ assert(portmaps.get(1).getContainerPort.equals(53))
+ assert(portmaps.get(1).getProtocol.equals("tcp"))
+ val volumes = execInfo.getContainer.getVolumesList
+ assert(volumes.get(0).getContainerPath.equals("/a"))
+ assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(1).getContainerPath.equals("/b"))
+ assert(volumes.get(1).getHostPath.equals("/b"))
+ assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(2).getContainerPath.equals("/c"))
+ assert(volumes.get(2).getHostPath.equals("/c"))
+ assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(3).getContainerPath.equals("/d"))
+ assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
+ assert(volumes.get(4).getContainerPath.equals("/e"))
+ assert(volumes.get(4).getHostPath.equals("/e"))
+ assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
+ }
+
test("mesos resource offers result in launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
val builder = Offer.newBuilder()
diff --git a/docker/spark-mesos/Dockerfile b/docker/spark-mesos/Dockerfile
new file mode 100644
index 0000000000..b90aef3655
--- /dev/null
+++ b/docker/spark-mesos/Dockerfile
@@ -0,0 +1,30 @@
+# This is an example Dockerfile for creating a Spark image which can be
+# references by the Spark property 'spark.mesos.executor.docker.image'
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM mesosphere/mesos:0.20.1
+
+# Update the base ubuntu image with dependencies needed for Spark
+RUN apt-get update && \
+ apt-get install -y python libnss3 openjdk-7-jre-headless curl
+
+RUN mkdir /opt/spark && \
+ curl http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.4.tgz \
+ | tar -xzC /opt
+ENV SPARK_HOME /opt/spark
+ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8f53d8201a..5f1d6daeb2 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -184,6 +184,16 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere
only makes sense if you run just one application at a time. You can cap the maximum number of cores
using `conf.set("spark.cores.max", "10")` (for example).
+# Mesos Docker Support
+
+Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image`
+in your [SparkConf](configuration.html#spark-properties).
+
+The Docker image used must have an appropriate version of Spark already part of the image, or you can
+have Mesos download Spark via the usual methods.
+
+Requires Mesos version 0.20.1 or later.
+
# Running Alongside Hadoop
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
@@ -238,6 +248,38 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.executor.docker.image</code></td>
+ <td>(none)</td>
+ <td>
+ Set the name of the docker image that the Spark executors will run in. The selected
+ image must have Spark installed, as well as a compatible version of the Mesos library.
+ The installed path of Spark in the image can be specified with <code>spark.mesos.executor.home</code>;
+ the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_LIBRARY</code>.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.executor.docker.volumes</code></td>
+ <td>(none)</td>
+ <td>
+ Set the list of volumes which will be mounted into the Docker image, which was set using
+ <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of
+ mappings following the form passed to <tt>docker run -v</tt>. That is they take the form:
+
+ <pre>[host_path:]container_path[:ro|:rw]</pre>
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.executor.docker.portmaps</code></td>
+ <td>(none)</td>
+ <td>
+ Set the list of incoming ports exposed by the Docker image, which was set using
+ <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of
+ mappings which take the form:
+
+ <pre>host_port:container_port[:tcp|:udp]</pre>
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.executor.home</code></td>
<td>driver side <code>SPARK_HOME</code></td>
<td>
diff --git a/pom.xml b/pom.xml
index c85c5feeaf..4313f94003 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
<java.version>1.6</java.version>
<sbt.project.name>spark</sbt.project.name>
<scala.macros.version>2.0.1</scala.macros.version>
- <mesos.version>0.21.0</mesos.version>
+ <mesos.version>0.21.1</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.10</slf4j.version>
<log4j.version>1.2.17</log4j.version>