aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Hoffmann <mail@philipphoffmann.de>2016-07-25 20:14:47 +0100
committerSean Owen <sowen@cloudera.com>2016-07-25 20:14:47 +0100
commit978cd5f125eb5a410bad2e60bf8385b11cf1b978 (patch)
treec33a44dc7487259409a64eb515ae6c6e002c56fb
parentdd784a8822497ad0631208d56325c4d74ab9e036 (diff)
downloadspark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.gz
spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.bz2
spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.zip
[SPARK-15271][MESOS] Allow force pulling executor docker images
## What changes were proposed in this pull request? Mesos agents by default will not pull docker images which are cached locally already. In order to run Spark executors from mutable tags like `:latest` this commit introduces a Spark setting `spark.mesos.executor.docker.forcePullImage`. Setting this flag to true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous implementation and Mesos' default behaviour). ## How was this patch tested? I ran a sample application including this change on a Mesos cluster and verified the correct behaviour for both, with and without, force pulling the executor image. As expected the image is being force pulled if the flag is set. Author: Philipp Hoffmann <mail@philipphoffmann.de> Closes #13051 from philipphoffmann/force-pull-image.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala63
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala2
-rw-r--r--dev/deps/spark-deps-hadoop-2.22
-rw-r--r--dev/deps/spark-deps-hadoop-2.32
-rw-r--r--dev/deps/spark-deps-hadoop-2.42
-rw-r--r--dev/deps/spark-deps-hadoop-2.62
-rw-r--r--dev/deps/spark-deps-hadoop-2.72
-rw-r--r--docs/_config.yml2
-rw-r--r--docs/running-on-mesos.md12
-rw-r--r--pom.xml2
14 files changed, 110 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 39b0f4d0e2..1e9644d06e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
- val container = taskInfo.getContainerBuilder()
- val volumes = submission.schedulerProperties
- .get("spark.mesos.executor.docker.volumes")
- .map(MesosSchedulerBackendUtil.parseVolumesSpec)
- val portmaps = submission.schedulerProperties
- .get("spark.mesos.executor.docker.portmaps")
- .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
- MesosSchedulerBackendUtil.addDockerInfo(
- container, image, volumes = volumes, portmaps = portmaps)
- taskInfo.setContainer(container.build())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ submission.schedulerProperties.get,
+ taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 99e6d39583..52993caad1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ taskBuilder.getContainerBuilder
+ )
}
tasks(offer.getId) ::= taskBuilder.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e08dc3b595..8d4fc9eed7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ executorInfo.getContainerBuilder()
+ )
}
(executorInfo.build(), resourcesAfterMem.asJava)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 05b2b08944..aa669f01bd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
-import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
+ forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
- val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+ val docker = ContainerInfo.DockerInfo.newBuilder()
+ .setImage(image)
+ .setForcePullImage(forcePullImage)
network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
- * Setup a docker containerizer
+ * Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
- conf: SparkConf,
+ conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
- val volumes = conf
- .getOption("spark.mesos.executor.docker.volumes")
+ val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+ .exists(_.equals("true"))
+ val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
- val portmaps = conf
- .getOption("spark.mesos.executor.docker.portmaps")
+ val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
+
addDockerInfo(
builder,
imageName,
+ forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c2779d7b35..d3a85c654e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
+ test("docker settings are reflected in created tasks") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image",
+ "spark.mesos.executor.docker.forcePullImage" -> "true",
+ "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+ "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched("o1").asScala
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val volumes = containerInfo.getVolumesList.asScala
+ assert(volumes.size == 1)
+
+ val volume = volumes.head
+ assert(volume.getHostPath == "/host_vol")
+ assert(volume.getContainerPath == "/container_vol")
+ assert(volume.getMode == Volume.Mode.RO)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(dockerInfo.getForcePullImage)
+
+ val portMappings = dockerInfo.getPortMappingsList.asScala
+ assert(portMappings.size == 1)
+
+ val portMapping = portMappings.head
+ assert(portMapping.getHostPort == 8080)
+ assert(portMapping.getContainerPort == 80)
+ assert(portMapping.getProtocol == "tcp")
+ }
+
+ test("force-pull-image option is disabled by default") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched("o1").asScala
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(!dockerInfo.getForcePullImage)
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 41693b1191..fcf39f6391 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
+ .set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
@@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+ assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 5d536b7c24..ff15873140 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index d16f42a97d..2b5764f868 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 2e261cb9a5..3f53fdb09c 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 67f38f4c22..d3a7ab8bb4 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 07583963d9..05317a044d 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/docs/_config.yml b/docs/_config.yml
index be3d8a2fe6..bbb576e0e7 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.21.0
+MESOS_VERSION: 0.22.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 10dc9ce890..ce888b5445 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.
Requires Mesos version 0.20.1 or later.
+Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
+tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
+image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
+
# Running Alongside Hadoop
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
@@ -335,6 +339,14 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.executor.docker.forcePullImage</code></td>
+ <td>false</td>
+ <td>
+ Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
+ By default Mesos agents will not pull images they already have cached.
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.executor.docker.volumes</code></td>
<td>(none)</td>
<td>
diff --git a/pom.xml b/pom.xml
index d064cb57dd..b69292d188 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
- <mesos.version>0.21.1</mesos.version>
+ <mesos.version>0.22.2</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>